You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/11 09:08:34 UTC
[incubator-inlong] branch master updated: [INLONG-3062][Manager] Merge the data_proxy_cluster table and the third_party_cluster table (#3063)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1838ba3 [INLONG-3062][Manager] Merge the data_proxy_cluster table and the third_party_cluster table (#3063)
1838ba3 is described below
commit 1838ba3eb2720c6ada0789a192cd3a5b890a9443
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 11 17:08:29 2022 +0800
[INLONG-3062][Manager] Merge the data_proxy_cluster table and the third_party_cluster table (#3063)
---
bin/inlong-daemon | 4 -
.../inlong/manager/common/enums/Constant.java | 3 +-
...terPageRequest.java => ClusterPageRequest.java} | 15 +-
.../common/pojo/cluster/ClusterRequest.java | 34 ++-
.../{ClusterInfo.java => ClusterResponse.java} | 24 +-
.../common/pojo/cluster/DataProxyClusterInfo.java | 70 ------
.../common/pojo/dataproxy/DataProxyClusterSet.java | 15 +-
...taProxyIpRequest.java => DataProxyRequest.java} | 8 +-
...ProxyIpResponse.java => DataProxyResponse.java} | 14 +-
.../dao/mapper/DataProxyClusterEntityMapper.java | 6 +-
.../dao/mapper/ThirdPartyClusterEntityMapper.java | 12 +-
.../mappers/DataProxyClusterEntityMapper.xml | 9 +-
.../mappers/ThirdPartyClusterEntityMapper.xml | 109 ++++----
.../manager/service/CommonOperateService.java | 15 +-
.../service/core/DataProxyClusterService.java | 78 ------
.../service/core/ThirdPartyClusterService.java | 64 +++--
.../core/impl/DataProxyClusterServiceImpl.java | 273 ---------------------
.../core/impl/ThirdPartyClusterServiceImpl.java | 270 +++++++++++++++-----
.../manager-web/sql/apache_inlong_manager.sql | 3 -
.../manager/web/controller/ClusterController.java | 71 ++----
.../controller/openapi/DataProxyController.java | 38 ++-
.../controller/openapi/OpenClusterController.java | 20 +-
22 files changed, 454 insertions(+), 701 deletions(-)
diff --git a/bin/inlong-daemon b/bin/inlong-daemon
index a9a792a..9de429d 100644
--- a/bin/inlong-daemon
+++ b/bin/inlong-daemon
@@ -172,10 +172,6 @@ start_inlong_dataproxy() {
cd $INLONG_HOME/inlong-dataproxy/bin
chmod 755 *.sh
./dataproxy-start.sh
- echo "update cluster information into data_proxy_cluster table"
- update_db_sql="UPDATE apache_inlong_manager.data_proxy_cluster SET address='"$dataproxy_ip"' WHERE name='default_dataproxy'"
- mysql -h${spring_datasource_hostname} -P${spring_datasource_port} -u${spring_datasource_username} -p${spring_datasource_password} -e "${update_db_sql}"
- echo "cluster information updated"
}
start_inlong_agent() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 79d2fb0..9666853 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -69,7 +69,8 @@ public class Constant {
public static final String SCHEMA_M0_DAY = "m0_day";
- public static final String CLUSTER_HIVE_TOPO = "HIVE_TOPO";
+ public static final String CLUSTER_PULSAR = "PULSAR";
+ public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
public static final String ID_IS_EMPTY = "primary key is empty";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterPageRequest.java
similarity index 77%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterPageRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterPageRequest.java
index 8a6aa8c..18c471c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterPageRequest.java
@@ -24,12 +24,18 @@ import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.beans.PageRequest;
/**
- * DataProxy cluster paging query conditions
+ * Cluster paging query conditions
*/
@Data
@EqualsAndHashCode(callSuper = false)
-@ApiModel("DataProxy cluster paging query conditions")
-public class DataProxyClusterPageRequest extends PageRequest {
+@ApiModel("Cluster paging query conditions")
+public class ClusterPageRequest extends PageRequest {
+
+ @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, etc.")
+ private String type;
+
+ @ApiModelProperty(value = "Cluster IP")
+ private String ip;
@ApiModelProperty(value = "Keywords, name, description, etc.")
private String keyWord;
@@ -40,4 +46,7 @@ public class DataProxyClusterPageRequest extends PageRequest {
@ApiModelProperty(value = "Current user", hidden = true)
private String currentUser;
+ @ApiModelProperty(value = "Set name of MQ cluster", hidden = true)
+ private String mqSetName;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterRequest.java
index c85b0b9..7494a87 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterRequest.java
@@ -36,19 +36,43 @@ import lombok.NoArgsConstructor;
@ApiModel("Cluster Information Query Conditions")
public class ClusterRequest {
- @ApiModelProperty(value = "cluster type")
+ @ApiModelProperty(value = "Incremental primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Cluster name")
+ private String name;
+
+ @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, etc.")
private String type;
@ApiModelProperty(value = "Cluster IP")
private String ip;
- @ApiModelProperty(value = "Whether to backup the cluster")
- private Integer isBackup;
+ @ApiModelProperty(value = "Cluster port")
+ private Integer port;
- @ApiModelProperty(value = "Status")
- private Integer status;
+ @ApiModelProperty(value = "Cluster token")
+ private String token;
+
+ @ApiModelProperty(value = "Cluster URL address")
+ private String url;
+
+ @ApiModelProperty(value = "Whether it is a backup cluster, 0: no, 1: yes")
+ private Integer isBackup;
@ApiModelProperty(value = "MQ set name")
private String mqSetName;
+ @ApiModelProperty(value = "MQ config info")
+ private String extParams;
+
+ @ApiModelProperty(value = "Name of in charges, separated by commas")
+ private String inCharges;
+
+ @ApiModelProperty(value = "Name of in creator")
+ private String creator;
+
+ @ApiModelProperty(value = "Cluster status")
+ private Integer status;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterResponse.java
similarity index 81%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterResponse.java
index 2d52e9e..5727c2c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterResponse.java
@@ -17,18 +17,21 @@
package org.apache.inlong.manager.common.pojo.cluster;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import java.util.Date;
+
/**
- * Cluster info
+ * Cluster response
*/
@Data
-@ApiModel("Cluster info")
-public class ClusterInfo {
+@ApiModel("Cluster response")
+public class ClusterResponse {
- @ApiModelProperty(value = "Incremental primary key")
+ @ApiModelProperty(value = "Primary key")
private Integer id;
@ApiModelProperty(value = "Cluster name")
@@ -61,10 +64,19 @@ public class ClusterInfo {
@ApiModelProperty(value = "Name of in charges, separated by commas")
private String inCharges;
+ @ApiModelProperty(value = "Cluster status")
+ private Integer status;
+
@ApiModelProperty(value = "Name of in creator")
private String creator;
- @ApiModelProperty(value = "Cluster status")
- private Integer status;
+ @ApiModelProperty(value = "Name of in modifier")
+ private String modifier;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterInfo.java
deleted file mode 100644
index f882f6d..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/DataProxyClusterInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.cluster;
-
-import com.fasterxml.jackson.annotation.JsonFormat;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-import java.util.Date;
-
-/**
- * DataProxy cluster information
- */
-@Data
-@ApiModel("DataProxy cluster information")
-public class DataProxyClusterInfo {
-
- @ApiModelProperty(value = "Primary key")
- private Integer id;
-
- @ApiModelProperty(value = "Cluster name")
- private String name;
-
- @ApiModelProperty(value = "Cluster description")
- private String description;
-
- @ApiModelProperty(value = "Cluster address")
- private String address;
-
- @ApiModelProperty(value = "Access port number, multiple ports are separated by a comma")
- private String port;
-
- @ApiModelProperty(value = "Whether it is a backup cluster, 0: no, 1: yes")
- private Integer isBackup;
-
- @ApiModelProperty(value = "Name of its mqSetName")
- private String mqSetName;
-
- @ApiModelProperty(value = "Extended params, string in JSON format")
- private String extParams;
-
- @ApiModelProperty(value = "Name of responsible person, separated by commas")
- private String inCharges;
-
- @ApiModelProperty(value = "Cluster status")
- private Integer status;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date createTime;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date modifyTime;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java
index 642ced7..99939ff 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -46,9 +46,9 @@ public class DataProxyClusterSet {
private Map<String, Set<String>> proxy2Cache = new HashMap<>();
//
private String defaultConfigJson;
- // Map<proxyClusterName, jsonString>
+ // key: proxyClusterName, value: jsonString
private Map<String, String> proxyConfigJson = new HashMap<>();
- // Map<proxyClusterName, md5>
+ // key: proxyClusterName, value: md5
private Map<String, String> md5Map = new HashMap<>();
/**
@@ -215,16 +215,9 @@ public class DataProxyClusterSet {
/**
* addProxy2Cache
- *
- * @param proxyClusterName
- * @param cacheClusterName
*/
public void addProxy2Cache(String proxyClusterName, String cacheClusterName) {
- Set<String> cacheNameSet = this.proxy2Cache.get(proxyClusterName);
- if (cacheNameSet == null) {
- cacheNameSet = new HashSet<>();
- this.proxy2Cache.put(proxyClusterName, cacheNameSet);
- }
+ Set<String> cacheNameSet = this.proxy2Cache.computeIfAbsent(proxyClusterName, k -> new HashSet<>());
cacheNameSet.add(cacheClusterName);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyRequest.java
similarity index 83%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyRequest.java
index f7351ed..3f41a08 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyRequest.java
@@ -22,13 +22,13 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
-@ApiModel("DataProxy IP request param")
-public class DataProxyIpRequest {
+@ApiModel("DataProxy request")
+public class DataProxyRequest {
- @ApiModelProperty(value = "local ip of the data proxy")
+ @ApiModelProperty(value = "IP of the data proxy")
String ip = "";
- @ApiModelProperty(value = "net tag of the data proxy, default: all")
+ @ApiModelProperty(value = "Net tag of the data proxy, default: all")
String netTag;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyResponse.java
similarity index 77%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpResponse.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyResponse.java
index 333c77d..6dd8e25 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyIpResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyResponse.java
@@ -22,19 +22,19 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
-@ApiModel("DataProxy IP response")
-public class DataProxyIpResponse {
+@ApiModel("DataProxy response")
+public class DataProxyResponse {
- @ApiModelProperty(value = "cluster id")
+ @ApiModelProperty(value = "Cluster id")
private Integer id;
- @ApiModelProperty(value = "cluster instance ip")
+ @ApiModelProperty(value = "Cluster IP")
private String ip;
- @ApiModelProperty(value = "cluster instance port")
- private String port;
+ @ApiModelProperty(value = "Cluster port")
+ private Integer port;
- @ApiModelProperty(value = "cluster ip type, default: all")
+ @ApiModelProperty(value = "Cluster ip type, default: all")
private String netTag;
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataProxyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataProxyClusterEntityMapper.java
index 5c24e8c..80e1578 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataProxyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataProxyClusterEntityMapper.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
import org.springframework.stereotype.Repository;
@@ -34,9 +34,7 @@ public interface DataProxyClusterEntityMapper {
DataProxyClusterEntity selectByPrimaryKey(Integer id);
- List<DataProxyClusterEntity> selectAll();
-
- List<DataProxyClusterEntity> selectByCondition(DataProxyClusterPageRequest request);
+ List<DataProxyClusterEntity> selectByCondition(ClusterPageRequest request);
int updateByPrimaryKeySelective(DataProxyClusterEntity record);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
index 9f0bfc0..71f07ff 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
import org.springframework.stereotype.Repository;
@@ -27,20 +27,24 @@ import java.util.List;
@Repository
public interface ThirdPartyClusterEntityMapper {
- int deleteByPrimaryKey(Integer id);
-
int insert(ThirdPartyClusterEntity record);
int insertSelective(ThirdPartyClusterEntity record);
ThirdPartyClusterEntity selectByPrimaryKey(Integer id);
+ List<ThirdPartyClusterEntity> selectByCondition(ClusterPageRequest request);
+
List<ThirdPartyClusterEntity> selectByIdList(@Param("idList") List<Integer> idList);
+ List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);
+
+ ThirdPartyClusterEntity selectByName(@Param("name") String name);
+
int updateByPrimaryKeySelective(ThirdPartyClusterEntity record);
int updateByPrimaryKey(ThirdPartyClusterEntity record);
- List<ThirdPartyClusterEntity> selectByCondition(ClusterRequest request);
+ int deleteByPrimaryKey(Integer id);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataProxyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataProxyClusterEntityMapper.xml
index da83a38..fdece0b 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataProxyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataProxyClusterEntityMapper.xml
@@ -48,14 +48,9 @@
from data_proxy_cluster
where id = #{id,jdbcType=INTEGER}
</select>
- <select id="selectAll" resultType="org.apache.inlong.manager.dao.entity.DataProxyClusterEntity">
- select
- <include refid="Base_Column_List"/>
- from data_proxy_cluster
- where is_deleted = 0
- </select>
+
<select id="selectByCondition" resultType="org.apache.inlong.manager.dao.entity.DataProxyClusterEntity"
- parameterType="org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest">
+ parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest">
select
<include refid="Base_Column_List"/>
from data_proxy_cluster
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
index cb07903..8b871ec 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
@@ -43,17 +43,7 @@
id, name, type, ip, port, url, token, is_backup, mq_set_name, ext_params, in_charges,
status, is_deleted, creator, modifier, create_time, modify_time
</sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List"/>
- from third_party_cluster
- where id = #{id,jdbcType=INTEGER}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete
- from third_party_cluster
- where id = #{id,jdbcType=INTEGER}
- </delete>
+
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
insert into third_party_cluster (id, name, type,
@@ -179,6 +169,63 @@
</if>
</trim>
</insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByCondition" parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest"
+ resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ <where>
+ is_deleted = 0
+ <if test="type != null and type != ''">
+ and `type` = #{type, jdbcType=VARCHAR}
+ </if>
+ <if test="ip != null and ip != ''">
+ and ip like CONCAT('%',#{ip},'%')
+ </if>
+ <if test="status != null and status != ''">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
+ <if test="mqSetName != null and mqSetName != ''">
+ and mq_set_name = #{mqSetName, jdbcType=VARCHAR}
+ </if>
+ </where>
+ order by modify_time desc
+ </select>
+ <select id="selectByIdList" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ <where>
+ <if test="idList != null and idList.size()>0">
+ and id in
+ <foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </if>
+ </where>
+ </select>
+ <select id="selectByType" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ where is_deleted = 0
+ and `type` = #{type, jdbcType=VARCHAR}
+ </select>
+ <select id="selectByName" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ where is_deleted = 0
+ and name = #{name, jdbcType=VARCHAR}
+ </select>
+
<update id="updateByPrimaryKeySelective"
parameterType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
update third_party_cluster
@@ -255,41 +302,9 @@
where id = #{id,jdbcType=INTEGER}
</update>
- <select id="selectByCondition" parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterRequest"
- resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
- select
- <include refid="Base_Column_List"/>
- from third_party_cluster
- <where>
- is_deleted = 0
- <if test="type != null and type != ''">
- and `type` = #{type, jdbcType=VARCHAR}
- </if>
- <if test="ip != null and ip != ''">
- and ip like CONCAT('%',#{ip},'%')
- </if>
- <if test="isBackup != null and isBackup != ''">
- and is_backup = #{isBackup, jdbcType=INTEGER}
- </if>
- <if test="status != null and status != ''">
- and status = #{status, jdbcType=INTEGER}
- </if>
- <if test="mqSetName != null and mqSetName != ''">
- and mq_set_name = #{mqSetName, jdbcType=VARCHAR}
- </if>
- </where>
- </select>
- <select id="selectByIdList" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
- select
- <include refid="Base_Column_List"/>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
from third_party_cluster
- <where>
- <if test="idList != null and idList.size()>0">
- and id in
- <foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
- #{item}
- </foreach>
- </if>
- </where>
- </select>
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index 3f0b322..b171f90 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
@@ -141,9 +141,9 @@ public class CommonOperateService {
* @param type Cluster type, such as TUBE, PULSAR, etc.
*/
private ThirdPartyClusterEntity getThirdPartyCluster(String type) {
- InlongGroupPageRequest request = new InlongGroupPageRequest();
- request.setMiddlewareType(type);
- List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(request);
+ InlongGroupPageRequest groupPageRequest = new InlongGroupPageRequest();
+ groupPageRequest.setMiddlewareType(type);
+ List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(groupPageRequest);
if (groupEntities.isEmpty()) {
LOGGER.warn("no inlong group found by type={}", type);
return null;
@@ -157,9 +157,10 @@ public class CommonOperateService {
}
String mqSetName = dataProxyCluster.getMqSetName();
- ClusterRequest mqNameRequest = ClusterRequest.builder().mqSetName(mqSetName).build();
- List<ThirdPartyClusterEntity> thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(mqNameRequest);
- if (thirdPartyClusters.isEmpty()) {
+ ClusterPageRequest clusterRequest = new ClusterPageRequest();
+ clusterRequest.setMqSetName(mqSetName);
+ List<ThirdPartyClusterEntity> thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(clusterRequest);
+ if (CollectionUtils.isEmpty(thirdPartyClusters)) {
LOGGER.warn("no related third-party-cluster by type={} and mq set name={}", type, mqSetName);
return null;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
index 0f23cca..c235ff3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
@@ -17,94 +17,16 @@
package org.apache.inlong.manager.service.core;
-import com.github.pagehelper.PageInfo;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
-
-import java.util.List;
-
/**
* DataProxy cluster service layer interface
*/
public interface DataProxyClusterService {
/**
- * Save DataProxy cluster information
- *
- * @param clusterInfo Cluster information
- * @param operator Current operator
- * @return ID after saving
- */
- Integer save(DataProxyClusterInfo clusterInfo, String operator);
-
- /**
- * Save DataProxy cluster information
- *
- * @param id Cluster ID
- * @return Cluster information succeed
- */
- DataProxyClusterInfo get(Integer id);
-
- /**
- * Query DataProxy cluster list according to conditions
- *
- * @param request Query conditions
- * @return DataProxy cluster list
- */
- PageInfo<DataProxyClusterInfo> listByCondition(DataProxyClusterPageRequest request);
-
- /**
- * Change DataProxy cluster information
- *
- * @param clusterInfo The information to be modified
- * @param operator Current operator
- * @return Whether succeed
- */
- Boolean update(DataProxyClusterInfo clusterInfo, String operator);
-
- /**
- * Delete DataProxy cluster information
- *
- * @param id Cluster ID to be deleted
- * @param operator Current operator
- * @return Whether succeed
- */
- Boolean delete(Integer id, String operator);
-
- /**
- * Query data proxy ip list
- *
- * @param request query request param
- * @return data proxy ip list
- */
- List<DataProxyIpResponse> getIpList(DataProxyIpRequest request);
-
- /**
* query data proxy config by cluster id
*
* @return data proxy config
*/
- List<DataProxyConfig> getConfig();
-
- /**
- * query data proxy config by cluster id
- *
- * @param clusterName
- * @param setName
- * @param md5
- * @return data proxy config
- */
String getAllConfig(String clusterName, String setName, String md5);
- /**
- * query data proxy config by cluster id, result includes pulsar cluster configs and topic etc
- *
- * @param dataproxyClusterName
- * @return
- */
- ThirdPartyClusterDTO getConfigV2(String dataproxyClusterName);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ThirdPartyClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ThirdPartyClusterService.java
index e14e254..96e33d8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ThirdPartyClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ThirdPartyClusterService.java
@@ -17,8 +17,14 @@
package org.apache.inlong.manager.service.core;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
+import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyRequest;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
import java.util.List;
@@ -27,56 +33,70 @@ import java.util.List;
*/
public interface ThirdPartyClusterService {
- List<String> listClusterIpByType(String type);
/**
- * Find cluster information based on type
+ * Save cluster info.
*
- * @param request Query conditions
- * @return Cluster information
+ * @param request Cluster info.
+ * @param operator Current operator.
+ * @return ID after saving.
*/
- List<ClusterInfo> list(ClusterRequest request);
+ Integer save(ClusterRequest request, String operator);
/**
- * Query cluster information according to the list of cluster IDs
+ * Get cluster info by id.
*
- * @param clusterIdList Cluster ID list
- * @return Cluster information
+ * @param id Cluster ID.
+ * @return Cluster info.
*/
- List<ClusterInfo> getClusterInfoByIdList(List<Integer> clusterIdList);
+ ClusterResponse get(Integer id);
/**
- * Save cluster information
+ * Paging query clusters according to conditions.
*
- * @param clusterInfo Cluster information
- * @param operator Current operator
- * @return ID after saving
+ * @param request Query conditions.
+ * @return Cluster list.
*/
- Integer save(ClusterInfo clusterInfo, String operator);
+ PageInfo<ClusterResponse> list(ClusterPageRequest request);
+
+ List<String> listClusterIpByType(String type);
/**
* Change cluster information
*
- * @param clusterInfo The information to be modified
+ * @param request The cluster info to be modified
* @param operator Current operator
* @return Whether succeed
*/
- Boolean update(ClusterInfo clusterInfo, String operator);
+ Boolean update(ClusterRequest request, String operator);
/**
* Delete cluster information
*
- * @param id Cluster ID to be deleted
+ * @param id Cluster ID to be deleted
* @param operator Current operator
* @return Whether succeed
*/
Boolean delete(Integer id, String operator);
/**
- * Save cluster information
+ * Query data proxy ip list
+ *
+ * @param request query request param
+ * @return data proxy ip list
+ */
+ List<DataProxyResponse> getIpList(DataProxyRequest request);
+
+ /**
+ * query data proxy config by cluster id
*
- * @param id Cluster ID
- * @return Cluster information succeed
+ * @return data proxy config
+ */
+ List<DataProxyConfig> getConfig();
+
+ /**
+ * query data proxy config by cluster id, result includes pulsar cluster configs and topic etc
*/
- ClusterInfo get(Integer id);
+ ThirdPartyClusterDTO getConfigV2(String dataproxyClusterName);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 1bcac9e..408a66b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -17,50 +17,14 @@
package org.apache.inlong.manager.service.core.impl;
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import com.github.pagehelper.PageInfo;
import com.google.gson.Gson;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupPulsarEntity;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
-import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupPulsarEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
/**
* DataProxy cluster service layer implementation class
@@ -69,245 +33,8 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
public class DataProxyClusterServiceImpl implements DataProxyClusterService {
- private static final Logger LOGGER = LoggerFactory.getLogger(DataProxyClusterServiceImpl.class);
-
- @Autowired
- private DataProxyClusterEntityMapper dataProxyClusterMapper;
- @Autowired
- private InlongGroupEntityMapper groupMapper;
- @Autowired
- private InlongGroupPulsarEntityMapper inlongGroupPulsarEntityMapper;
- @Autowired
- private InlongStreamEntityMapper streamMapper;
@Autowired
private DataProxyConfigRepository proxyRepository;
- @Autowired
- private ClusterBean clusterBean;
- @Autowired
- private ThirdPartyClusterEntityMapper thirdPartyClusterEntityMapper;
-
- @Transactional(rollbackFor = Throwable.class)
- @Override
- public Integer save(DataProxyClusterInfo clusterInfo, String operator) {
- LOGGER.info("begin to save data proxy cluster={}", clusterInfo);
- Preconditions.checkNotNull(clusterInfo, "data proxy cluster is empty");
-
- DataProxyClusterEntity entity = CommonBeanUtils.copyProperties(clusterInfo, DataProxyClusterEntity::new);
-
- entity.setCreator(operator);
- entity.setModifier(operator);
- entity.setCreateTime(new Date());
- dataProxyClusterMapper.insertSelective(entity);
-
- LOGGER.info("success to save data proxy cluster");
- return entity.getId();
- }
-
- @Override
- public DataProxyClusterInfo get(Integer id) {
- LOGGER.info("begin to get data proxy cluster by id={}", id);
- Preconditions.checkNotNull(id, "data proxy cluster id is empty");
-
- DataProxyClusterEntity entity = dataProxyClusterMapper.selectByPrimaryKey(id);
- if (entity == null) {
- LOGGER.error("data proxy cluster not found by id={}", id);
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
- }
-
- DataProxyClusterInfo clusterInfo = CommonBeanUtils.copyProperties(entity, DataProxyClusterInfo::new);
-
- LOGGER.info("success to get data proxy cluster info");
- return clusterInfo;
- }
-
- @Override
- public PageInfo<DataProxyClusterInfo> listByCondition(DataProxyClusterPageRequest request) {
- LOGGER.info("begin to list data proxy cluster by {}", request);
-
- PageHelper.startPage(request.getPageNum(), request.getPageSize());
- Page<DataProxyClusterEntity> entityPage = (Page<DataProxyClusterEntity>) dataProxyClusterMapper
- .selectByCondition(request);
- List<DataProxyClusterInfo> clusterList = CommonBeanUtils.copyListProperties(entityPage,
- DataProxyClusterInfo::new);
- // Encapsulate the paging query results into the PageInfo object to obtain
- // related paging information
- PageInfo<DataProxyClusterInfo> page = new PageInfo<>(clusterList);
- page.setTotal(entityPage.getTotal());
-
- LOGGER.info("success to list data proxy cluster");
- return page;
- }
-
- @Transactional(rollbackFor = Throwable.class)
- @Override
- public Boolean update(DataProxyClusterInfo clusterInfo, String operator) {
- LOGGER.info("begin to update data proxy cluster={}", clusterInfo);
- Preconditions.checkNotNull(clusterInfo, "data proxy cluster is empty");
- Integer id = clusterInfo.getId();
- Preconditions.checkNotNull(id, "data proxy cluster id is empty");
-
- DataProxyClusterEntity entity = dataProxyClusterMapper.selectByPrimaryKey(id);
- if (entity == null) {
- LOGGER.error("data proxy cluster not found by id={}", id);
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
- }
-
- CommonBeanUtils.copyProperties(clusterInfo, entity, true);
- entity.setModifier(operator);
- dataProxyClusterMapper.updateByPrimaryKeySelective(entity);
-
- LOGGER.info("success to update data proxy cluster");
- return true;
- }
-
- @Transactional(rollbackFor = Throwable.class)
- @Override
- public Boolean delete(Integer id, String operator) {
- LOGGER.info("begin to delete data proxy cluster by id={}", id);
- Preconditions.checkNotNull(id, "data proxy cluster id is empty");
-
- DataProxyClusterEntity entity = dataProxyClusterMapper.selectByPrimaryKey(id);
- if (entity == null) {
- LOGGER.error("data proxy cluster not found by id={}", id);
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
- }
-
- entity.setIsDeleted(EntityStatus.IS_DELETED.getCode());
- entity.setStatus(EntityStatus.DELETED.getCode());
- entity.setModifier(operator);
- dataProxyClusterMapper.updateByPrimaryKey(entity);
-
- LOGGER.info("success to delete data proxy cluster");
- return true;
- }
-
- @Override
- public List<DataProxyIpResponse> getIpList(DataProxyIpRequest request) {
- LOGGER.debug("begin to get data proxy ip list, request: {}", request);
- List<DataProxyClusterEntity> entityList = dataProxyClusterMapper.selectAll();
- if (entityList == null || entityList.isEmpty()) {
- LOGGER.info("success to get data proxy ip list, but result is empty, request ip={}", request.getIp());
- return null;
- }
-
- List<DataProxyIpResponse> responseList = new ArrayList<>();
- for (DataProxyClusterEntity entity : entityList) {
- DataProxyIpResponse response = new DataProxyIpResponse();
- response.setId(entity.getId());
- response.setPort(entity.getPort());
- response.setIp(entity.getAddress());
-
- responseList.add(response);
- }
-
- LOGGER.info("success to get data proxy ip list, response size={}", responseList.size());
- return responseList;
- }
-
- @Override
- public List<DataProxyConfig> getConfig() {
- // get all configs with inlong group status of 130, that is, config successful
- // TODO Optimize query conditions
- List<InlongGroupEntity> bizEntityList = groupMapper.selectAll(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode());
- List<DataProxyConfig> configList = new ArrayList<>();
- for (InlongGroupEntity entity : bizEntityList) {
- String groupId = entity.getInlongGroupId();
- String bizResource = entity.getMqResourceObj();
-
- DataProxyConfig config = new DataProxyConfig();
- config.setM(entity.getSchemaName());
- if (Constant.MIDDLEWARE_TUBE.equals(entity.getMiddlewareType())) {
- config.setInlongGroupId(groupId);
- config.setTopic(bizResource);
- } else if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
- List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
- for (InlongStreamEntity stream : streamList) {
- String topic = stream.getMqResourceObj();
- String streamId = stream.getInlongStreamId();
- config.setInlongGroupId(groupId + "/" + streamId);
- config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
- }
- }
- configList.add(config);
- }
-
- return configList;
- }
-
- /**
- * query data proxy config by cluster name, result includes pulsar/tube cluster configs and topic etc
- */
- @Override
- public ThirdPartyClusterDTO getConfigV2(String dataproxyClusterName) {
-
- List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
- List<DataProxyConfig> topicList = new ArrayList<>();
-
- final DataProxyClusterEntity dataProxyClusterEntity = dataProxyClusterMapper.selectByName(dataproxyClusterName);
-
- // TODO Optimize query conditions use dataProxyClusterId
- List<InlongGroupEntity> groupEntities = groupMapper.selectAll(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode());
-// List<String> groupIdList = groupMapper.selectGroupIdByProxyId(dataProxyClusterEntity.getId());
- ClusterRequest request = ClusterRequest.builder().mqSetName(dataProxyClusterEntity.getMqSetName()).build();
- List<ThirdPartyClusterEntity> clusterInfoEntities = thirdPartyClusterEntityMapper
- .selectByCondition(request);
-
- // third-party-cluster type
- String middlewareType = "";
- if (!groupEntities.isEmpty()) {
- middlewareType = groupEntities.get(0).getMiddlewareType();
- }
-
-// if (!groupIdList.isEmpty()) {
-// middlewareType = groupMapper.selectByGroupId(groupIdList.get(0)).getMiddlewareType();
-// }
-
- // based on group id, get topic list
- for (InlongGroupEntity inlongGroupEntity : groupEntities) {
-// for (String groupId : groupIdList) {
- final String groupId = inlongGroupEntity.getInlongGroupId();
- final String mqResource = inlongGroupEntity.getMqResourceObj();
- if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
- List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
- for (InlongStreamEntity stream : streamList) {
- DataProxyConfig topicConfig = new DataProxyConfig();
- String streamId = stream.getInlongStreamId();
- String topic = stream.getMqResourceObj();
- String tenant = clusterBean.getDefaultTenant();
- InlongGroupPulsarEntity pulsarEntity = inlongGroupPulsarEntityMapper.selectByGroupId(groupId);
- if (pulsarEntity != null && StringUtils.isNotEmpty(pulsarEntity.getTenant())) {
- tenant = pulsarEntity.getTenant();
- }
- topicConfig.setInlongGroupId(groupId + "/" + streamId);
- topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
- topicList.add(topicConfig);
- }
- } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
- DataProxyConfig topicConfig = new DataProxyConfig();
- topicConfig.setInlongGroupId(groupId);
- topicConfig.setTopic(mqResource);
- topicList.add(topicConfig);
-
- }
- }
- // construct pulsarSet info
- Gson gson = new Gson();
- for (ThirdPartyClusterEntity cluster : clusterInfoEntities) {
- ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
- clusterInfo.setUrl(cluster.getUrl());
- clusterInfo.setToken(cluster.getToken());
- Map<String, String> configParams = gson.fromJson(cluster.getExtParams(), Map.class);
- clusterInfo.setParams(configParams);
-
- mqSet.add(clusterInfo);
- }
-
- ThirdPartyClusterDTO object = new ThirdPartyClusterDTO();
- object.setMqSet(mqSet);
- object.setTopicList(topicList);
-
- return object;
- }
/**
* query data proxy config by cluster id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index a4040f3..f10276f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -17,110 +17,149 @@
package org.apache.inlong.manager.service.core.impl;
-import lombok.extern.slf4j.Slf4j;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
+import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
+import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyRequest;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupPulsarEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupPulsarEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Map;
/**
* Implementation of cluster service
*/
@Service
-@Slf4j
public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
private static final Logger LOGGER = LoggerFactory.getLogger(ThirdPartyClusterServiceImpl.class);
+ private static final Gson GSON = new Gson();
@Autowired
- private ThirdPartyClusterEntityMapper thirdPartyClusterEntityMapper;
+ private ClusterBean clusterBean;
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongGroupPulsarEntityMapper pulsarEntityMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private ThirdPartyClusterEntityMapper thirdPartyClusterMapper;
@Override
- public List<String> listClusterIpByType(String type) {
- ClusterRequest request = ClusterRequest.builder().type(type).build();
- List<ThirdPartyClusterEntity> clusterInfoEntities = thirdPartyClusterEntityMapper.selectByCondition(request);
- List<String> ipList = new ArrayList<>(clusterInfoEntities.size());
- for (ThirdPartyClusterEntity entity : clusterInfoEntities) {
- ipList.add(entity.getIp());
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer save(ClusterRequest request, String operator) {
+ LOGGER.info("begin to insert a cluster info cluster={}", request);
+ Preconditions.checkNotNull(request, "cluster is empty");
+ ThirdPartyClusterEntity entity = CommonBeanUtils.copyProperties(request, ThirdPartyClusterEntity::new);
+ if (operator != null) {
+ entity.setCreator(operator);
}
- return ipList;
+ Preconditions.checkNotNull(entity.getCreator(), "cluster creator is empty");
+ entity.setCreateTime(new Date());
+ entity.setIsDeleted(Constant.UN_DELETED);
+ thirdPartyClusterMapper.insert(entity);
+ LOGGER.info("success to add a cluster");
+ return entity.getId();
}
@Override
- public List<ClusterInfo> list(ClusterRequest request) {
- LOGGER.info("begin to list cluster by request={}", request);
-
- List<ThirdPartyClusterEntity> entityList = thirdPartyClusterEntityMapper.selectByCondition(request);
- List<ClusterInfo> infoList = CommonBeanUtils.copyListProperties(entityList, ClusterInfo::new);
-
- LOGGER.info("success to get cluster");
- return infoList;
+ public ClusterResponse get(Integer id) {
+ LOGGER.info("begin to get cluster by id={}", id);
+ Preconditions.checkNotNull(id, "cluster id is empty");
+ ThirdPartyClusterEntity entity = thirdPartyClusterMapper.selectByPrimaryKey(id);
+ if (entity == null) {
+ LOGGER.error("cluster not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+ ClusterResponse clusterResponse = CommonBeanUtils.copyProperties(entity, ClusterResponse::new);
+ LOGGER.info("success to get cluster info");
+ return clusterResponse;
}
@Override
- public List<ClusterInfo> getClusterInfoByIdList(List<Integer> clusterIdList) {
- if (CollectionUtils.isEmpty(clusterIdList)) {
- return Collections.emptyList();
- }
- List<ThirdPartyClusterEntity> entityList = thirdPartyClusterEntityMapper.selectByIdList(clusterIdList);
- return CommonBeanUtils.copyListProperties(entityList, ClusterInfo::new);
+ public PageInfo<ClusterResponse> list(ClusterPageRequest request) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ Page<ThirdPartyClusterEntity> entityPage = (Page<ThirdPartyClusterEntity>)
+ thirdPartyClusterMapper.selectByCondition(request);
+ List<ClusterResponse> clusterList = CommonBeanUtils.copyListProperties(entityPage,
+ ClusterResponse::new);
+ PageInfo<ClusterResponse> page = new PageInfo<>(clusterList);
+ page.setTotal(entityPage.getTotal());
+
+ LOGGER.debug("success to list cluster by {}", request);
+ return page;
}
@Override
- public Integer save(ClusterInfo clusterInfo, String operator) {
- LOGGER.info("begin to insert a cluster info cluster={}", clusterInfo);
- Preconditions.checkNotNull(clusterInfo, "cluster is empty");
- ThirdPartyClusterEntity entity = CommonBeanUtils.copyProperties(clusterInfo, ThirdPartyClusterEntity::new);
- if (operator != null) {
- entity.setCreator(operator);
+ public List<String> listClusterIpByType(String type) {
+ ClusterPageRequest request = new ClusterPageRequest();
+ request.setType(type);
+ List<ThirdPartyClusterEntity> entityList = thirdPartyClusterMapper.selectByCondition(request);
+ List<String> ipList = new ArrayList<>(entityList.size());
+ for (ThirdPartyClusterEntity entity : entityList) {
+ ipList.add(entity.getIp());
}
- entity.setCreateTime(new Date());
- entity.setIsDeleted(Constant.UN_DELETED);
- thirdPartyClusterEntityMapper.insert(entity);
- LOGGER.info("success to add a cluster");
- return entity.getId();
+ return ipList;
}
@Override
- public Boolean update(ClusterInfo clusterInfo, String operator) {
- LOGGER.info("begin to update common cluster={}", clusterInfo);
- Preconditions.checkNotNull(clusterInfo, "cluster is empty");
- Integer id = clusterInfo.getId();
+ @Transactional(rollbackFor = Throwable.class)
+ public Boolean update(ClusterRequest request, String operator) {
+ Preconditions.checkNotNull(request, "cluster is empty");
+ Integer id = request.getId();
Preconditions.checkNotNull(id, "cluster id is empty");
- ThirdPartyClusterEntity entity = thirdPartyClusterEntityMapper.selectByPrimaryKey(id);
+ ThirdPartyClusterEntity entity = thirdPartyClusterMapper.selectByPrimaryKey(id);
if (entity == null) {
LOGGER.error("cluster not found by id={}", id);
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
}
- CommonBeanUtils.copyProperties(clusterInfo, entity, true);
+ CommonBeanUtils.copyProperties(request, entity, true);
entity.setModifier(operator);
- thirdPartyClusterEntityMapper.updateByPrimaryKeySelective(entity);
- LOGGER.info("success to update cluster");
+ thirdPartyClusterMapper.updateByPrimaryKeySelective(entity);
+
+ LOGGER.info("success to update cluster={}", request);
return true;
}
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Boolean delete(Integer id, String operator) {
- LOGGER.info("begin to delete cluster by id={}", id);
Preconditions.checkNotNull(id, "cluster id is empty");
- ThirdPartyClusterEntity entity = thirdPartyClusterEntityMapper.selectByPrimaryKey(id);
+ ThirdPartyClusterEntity entity = thirdPartyClusterMapper.selectByPrimaryKey(id);
if (entity == null) {
LOGGER.error("cluster not found by id={}", id);
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
@@ -128,23 +167,136 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
entity.setIsDeleted(id);
entity.setStatus(EntityStatus.DELETED.getCode());
entity.setModifier(operator);
- thirdPartyClusterEntityMapper.updateByPrimaryKey(entity);
- LOGGER.info("success to delete cluster");
+ thirdPartyClusterMapper.updateByPrimaryKey(entity);
+ LOGGER.info("success to delete cluster by id={}", id);
return true;
}
@Override
- public ClusterInfo get(Integer id) {
- LOGGER.info("begin to get cluster by id={}", id);
- Preconditions.checkNotNull(id, "cluster id is empty");
- ThirdPartyClusterEntity entity = thirdPartyClusterEntityMapper.selectByPrimaryKey(id);
- if (entity == null) {
- LOGGER.error("cluster not found by id={}", id);
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ public List<DataProxyResponse> getIpList(DataProxyRequest request) {
+ LOGGER.debug("begin to list data proxy by request={}", request);
+ List<ThirdPartyClusterEntity> entityList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+ if (entityList == null || entityList.isEmpty()) {
+ LOGGER.warn("success to list data proxy, but not found anything for request={}", request);
+ return null;
}
- ClusterInfo clusterInfo = CommonBeanUtils.copyProperties(entity, ClusterInfo::new);
- LOGGER.info("success to get cluster info");
- return clusterInfo;
+
+ List<DataProxyResponse> responseList = new ArrayList<>();
+ for (ThirdPartyClusterEntity entity : entityList) {
+ DataProxyResponse response = new DataProxyResponse();
+ response.setId(entity.getId());
+ response.setPort(entity.getPort());
+ response.setIp(entity.getIp());
+
+ responseList.add(response);
+ }
+
+ LOGGER.debug("success to list data proxy cluster={}", responseList);
+ return responseList;
+ }
+
+ @Override
+ public List<DataProxyConfig> getConfig() {
+ // get all configs with inlong group status of 130, that is, config successful
+ // TODO Optimize query conditions
+ List<InlongGroupEntity> groupEntityList = groupMapper.selectAll(GroupState.CONFIG_SUCCESSFUL.getCode());
+ List<DataProxyConfig> configList = new ArrayList<>();
+ for (InlongGroupEntity groupEntity : groupEntityList) {
+ String groupId = groupEntity.getInlongGroupId();
+ String bizResource = groupEntity.getMqResourceObj();
+
+ DataProxyConfig config = new DataProxyConfig();
+ config.setM(groupEntity.getSchemaName());
+ if (Constant.MIDDLEWARE_TUBE.equals(groupEntity.getMiddlewareType())) {
+ config.setInlongGroupId(groupId);
+ config.setTopic(bizResource);
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(groupEntity.getMiddlewareType())) {
+ List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
+ for (InlongStreamEntity stream : streamList) {
+ String topic = stream.getMqResourceObj();
+ String streamId = stream.getInlongStreamId();
+ config.setInlongGroupId(groupId + "/" + streamId);
+ config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
+ }
+ }
+ configList.add(config);
+ }
+
+ return configList;
+ }
+
+ /**
+ * query data proxy config by cluster name, result includes pulsar/tube cluster configs and topic etc
+ */
+ @Override
+ public ThirdPartyClusterDTO getConfigV2(String clusterName) {
+ ThirdPartyClusterEntity clusterEntity = thirdPartyClusterMapper.selectByName(clusterName);
+ if (clusterEntity == null) {
+ throw new BusinessException("data proxy cluster not found by name=" + clusterName);
+ }
+
+ // TODO Optimize query conditions use dataProxyClusterId
+ List<InlongGroupEntity> groupEntityList = groupMapper.selectAll(GroupState.CONFIG_SUCCESSFUL.getCode());
+ if (CollectionUtils.isEmpty(groupEntityList)) {
+ String msg = "not found any inlong group with success status for proxy cluster name = " + clusterName;
+ LOGGER.warn(msg);
+ throw new BusinessException(msg);
+ }
+
+ // third-party-cluster type
+ String middlewareType = "";
+ if (!groupEntityList.isEmpty()) {
+ middlewareType = groupEntityList.get(0).getMiddlewareType();
+ }
+
+ // Get topic list by group id
+ List<DataProxyConfig> topicList = new ArrayList<>();
+ for (InlongGroupEntity groupEntity : groupEntityList) {
+ final String groupId = groupEntity.getInlongGroupId();
+ final String mqResource = groupEntity.getMqResourceObj();
+ if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+ List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
+ for (InlongStreamEntity stream : streamList) {
+ DataProxyConfig topicConfig = new DataProxyConfig();
+ String streamId = stream.getInlongStreamId();
+ String topic = stream.getMqResourceObj();
+ String tenant = clusterBean.getDefaultTenant();
+ InlongGroupPulsarEntity pulsarEntity = pulsarEntityMapper.selectByGroupId(groupId);
+ if (pulsarEntity != null && StringUtils.isNotEmpty(pulsarEntity.getTenant())) {
+ tenant = pulsarEntity.getTenant();
+ }
+ topicConfig.setInlongGroupId(groupId + "/" + streamId);
+ topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
+ topicList.add(topicConfig);
+ }
+ } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
+ DataProxyConfig topicConfig = new DataProxyConfig();
+ topicConfig.setInlongGroupId(groupId);
+ topicConfig.setTopic(mqResource);
+ topicList.add(topicConfig);
+ }
+ }
+
+ // construct pulsarSet info
+ List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
+ ClusterPageRequest request = new ClusterPageRequest();
+ request.setMqSetName(clusterEntity.getMqSetName());
+ List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByCondition(request);
+ for (ThirdPartyClusterEntity cluster : clusterList) {
+ ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
+ clusterInfo.setUrl(cluster.getUrl());
+ clusterInfo.setToken(cluster.getToken());
+ Map<String, String> configParams = GSON.fromJson(cluster.getExtParams(), Map.class);
+ clusterInfo.setParams(configParams);
+
+ mqSet.add(clusterInfo);
+ }
+
+ ThirdPartyClusterDTO object = new ThirdPartyClusterDTO();
+ object.setMqSet(mqSet);
+ object.setTopicList(topicList);
+
+ return object;
}
}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e93bdaf..bb6cc78 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -312,9 +312,6 @@ CREATE TABLE `data_proxy_cluster`
UNIQUE KEY `cluster_name` (`name`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='DataProxy cluster table';
--- add default data proxy address
-insert into data_proxy_cluster (name, address, port, status, is_deleted, creator, create_time, modify_time)
-values ("default_dataproxy", "dataproxy", 46801, 0, 0, "admin", now(), now());
-- ----------------------------
-- Table structure for data_schema
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ClusterController.java
index 95eb2ae..d8ca4bf 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ClusterController.java
@@ -23,10 +23,9 @@ import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
@@ -41,8 +40,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-import java.util.List;
-
/**
* Cluster controller
*/
@@ -59,30 +56,30 @@ public class ClusterController {
@PostMapping(value = "/save")
@ApiOperation(value = "Save cluster info")
@OperationLog(operation = OperationType.CREATE)
- public Response<Integer> save(@RequestBody ClusterInfo clusterInfo) {
+ public Response<Integer> save(@RequestBody ClusterRequest request) {
String currentUser = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(thirdPartyClusterService.save(clusterInfo, currentUser));
+ return Response.success(thirdPartyClusterService.save(request, currentUser));
}
@GetMapping(value = "/get/{id}")
@ApiOperation(value = "Get cluster info by id")
@ApiImplicitParam(name = "id", value = "common cluster ID", dataTypeClass = Integer.class, required = true)
- public Response<ClusterInfo> get(@PathVariable Integer id) {
+ public Response<ClusterResponse> get(@PathVariable Integer id) {
return Response.success(thirdPartyClusterService.get(id));
}
@PostMapping(value = "/list")
- @ApiOperation(value = "List clusters by condition")
- public Response<List<ClusterInfo>> list(@RequestBody ClusterRequest request) {
+ @ApiOperation(value = "Get clusters by paginating")
+ public Response<PageInfo<ClusterResponse>> list(@RequestBody ClusterPageRequest request) {
return Response.success(thirdPartyClusterService.list(request));
}
@PostMapping(value = "/update")
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update cluster info")
- public Response<Boolean> update(@RequestBody ClusterInfo clusterInfo) {
+ public Response<Boolean> update(@RequestBody ClusterRequest request) {
String username = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(thirdPartyClusterService.update(clusterInfo, username));
+ return Response.success(thirdPartyClusterService.update(request, username));
}
@DeleteMapping(value = "/delete/{id}")
@@ -97,23 +94,23 @@ public class ClusterController {
@PostMapping(value = "/thirdparty/save")
@ApiOperation(value = "Add a cluster info")
@OperationLog(operation = OperationType.CREATE)
- public Response<Integer> saveClusterV1(@RequestBody ClusterInfo clusterInfo) {
+ public Response<Integer> saveClusterV1(@RequestBody ClusterRequest request) {
String currentUser = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(thirdPartyClusterService.save(clusterInfo, currentUser));
+ return Response.success(thirdPartyClusterService.save(request, currentUser));
}
@Deprecated
@GetMapping(value = "/thirdparty/get/{id}")
@ApiOperation(value = "Query third party cluster information of the common")
@ApiImplicitParam(name = "id", value = "common cluster ID", dataTypeClass = Integer.class, required = true)
- public Response<ClusterInfo> getClusterV1(@PathVariable Integer id) {
+ public Response<ClusterResponse> getClusterV1(@PathVariable Integer id) {
return Response.success(thirdPartyClusterService.get(id));
}
@Deprecated
@PostMapping(value = "/thirdparty/list")
@ApiOperation(value = "Query the list of general clusters based on conditions")
- public Response<List<ClusterInfo>> listV1(@RequestBody ClusterRequest request) {
+ public Response<PageInfo<ClusterResponse>> listV1(@RequestBody ClusterPageRequest request) {
return Response.success(thirdPartyClusterService.list(request));
}
@@ -121,9 +118,9 @@ public class ClusterController {
@RequestMapping(value = "/thirdparty/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Modify third party cluster information of the common")
- public Response<Boolean> updateClusterV1(@RequestBody ClusterInfo clusterInfo) {
+ public Response<Boolean> updateClusterV1(@RequestBody ClusterRequest request) {
String username = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(thirdPartyClusterService.update(clusterInfo, username));
+ return Response.success(thirdPartyClusterService.update(request, username));
}
@Deprecated
@@ -135,42 +132,4 @@ public class ClusterController {
return Response.success(thirdPartyClusterService.delete(id, LoginUserUtils.getLoginUserDetail().getUserName()));
}
- @RequestMapping(value = "/dataproxy/save", method = RequestMethod.POST)
- @OperationLog(operation = OperationType.CREATE)
- @ApiOperation(value = "Save cluster information of the DataProxy")
- public Response<Integer> saveDataProxy(@RequestBody DataProxyClusterInfo clusterInfo) {
- String currentUser = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(dataProxyClusterService.save(clusterInfo, currentUser));
- }
-
- @RequestMapping(value = "/dataproxy/get/{id}", method = RequestMethod.GET)
- @ApiOperation(value = "Query cluster information of the DataProxy")
- @ApiImplicitParam(name = "id", value = "DataProxy cluster ID", dataTypeClass = Integer.class, required = true)
- public Response<DataProxyClusterInfo> getDataProxy(@PathVariable Integer id) {
- return Response.success(dataProxyClusterService.get(id));
- }
-
- @RequestMapping(value = "/dataproxy/list", method = RequestMethod.GET)
- @ApiOperation(value = "Query the list of DataProxy clusters based on conditions")
- public Response<PageInfo<DataProxyClusterInfo>> listDataProxyByCondition(DataProxyClusterPageRequest request) {
- request.setCurrentUser(LoginUserUtils.getLoginUserDetail().getUserName());
- return Response.success(dataProxyClusterService.listByCondition(request));
- }
-
- @RequestMapping(value = "/dataproxy/update", method = RequestMethod.POST)
- @OperationLog(operation = OperationType.UPDATE)
- @ApiOperation(value = "Modify cluster information of the DataProxy")
- public Response<Boolean> updateDataProxy(@RequestBody DataProxyClusterInfo clusterInfo) {
- String username = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(dataProxyClusterService.update(clusterInfo, username));
- }
-
- @RequestMapping(value = "/dataproxy/delete/{id}", method = RequestMethod.DELETE)
- @ApiOperation(value = "Delete cluster information of the dataproxy")
- @OperationLog(operation = OperationType.DELETE)
- @ApiImplicitParam(name = "id", value = "DataProxy cluster id", dataTypeClass = Integer.class, required = true)
- public Response<Boolean> deleteDataProxy(@PathVariable Integer id) {
- return Response.success(dataProxyClusterService.delete(id, LoginUserUtils.getLoginUserDetail().getUserName()));
- }
-
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 3c3c630..d25d5a3 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -22,14 +22,15 @@ import io.swagger.annotations.ApiOperation;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyRequest;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
+import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -37,44 +38,41 @@ import java.util.List;
@RestController
@RequestMapping("/openapi/dataproxy")
-@Api(tags = "DataProxy Config")
+@Api(tags = "DataProxy-Config")
public class DataProxyController {
@Autowired
private DataProxyClusterService dataProxyClusterService;
+ @Autowired
+ private ThirdPartyClusterService thirdPartyClusterService;
- @PostMapping("/getIpList")
- @ApiOperation(value = "get data proxy ip list")
- public Response<List<DataProxyIpResponse>> postIpList(@RequestBody DataProxyIpRequest request) {
- return Response.success(dataProxyClusterService.getIpList(request));
- }
-
- @GetMapping("/getIpList")
- @ApiOperation(value = "get data proxy ip list")
- public Response<List<DataProxyIpResponse>> getIpList(@RequestBody DataProxyIpRequest request) {
- return Response.success(dataProxyClusterService.getIpList(request));
+ @RequestMapping(value = "/getIpList", method = {RequestMethod.GET, RequestMethod.POST})
+ @ApiOperation(value = "Get data proxy ip list")
+ public Response<List<DataProxyResponse>> getIpList(@RequestBody(required = false) DataProxyRequest request) {
+ return Response.success(thirdPartyClusterService.getIpList(request));
}
@GetMapping("/getConfig")
- @ApiOperation(value = "get data proxy config list")
+ @ApiOperation(value = "Get data proxy topic list")
public Response<List<DataProxyConfig>> getConfig() {
- return Response.success(dataProxyClusterService.getConfig());
+ return Response.success(thirdPartyClusterService.getConfig());
}
@GetMapping("/getConfig_v2")
- @ApiOperation(value = "get dataproxy config list, including pulsar cluster config and topic")
+ @ApiOperation(value = "Get data proxy list - including topic")
public Response<ThirdPartyClusterDTO> getConfigV2(@RequestParam("clusterName") String clusterName) {
- ThirdPartyClusterDTO dto = dataProxyClusterService.getConfigV2(clusterName);
+ ThirdPartyClusterDTO dto = thirdPartyClusterService.getConfigV2(clusterName);
if (dto.getMqSet().isEmpty() || dto.getTopicList().isEmpty()) {
- return Response.fail("fail to get mq config or topics");
+ return Response.fail("failed to get mq config or topics");
}
return Response.success(dto);
}
@GetMapping("/getAllConfig")
- @ApiOperation(value = "get data proxy config")
+ @ApiOperation(value = "Get all proxy config")
public String getAllConfig(@RequestParam("clusterName") String clusterName, @RequestParam("setName") String setName,
@RequestParam("md5") String md5) {
return dataProxyClusterService.getAllConfig(clusterName, setName, md5);
}
+
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenClusterController.java
index 05b8dad..527bb40 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenClusterController.java
@@ -17,13 +17,15 @@
package org.apache.inlong.manager.web.controller.openapi;
+import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
import org.apache.inlong.manager.service.core.operationlog.OperationLog;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,8 +37,6 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import java.util.List;
-
/**
* Cluster controller
*/
@@ -51,28 +51,28 @@ public class OpenClusterController {
@PostMapping(value = "/save")
@ApiOperation(value = "Save cluster info")
@OperationLog(operation = OperationType.CREATE)
- public Response<Integer> save(@RequestBody ClusterInfo clusterInfo) {
- return Response.success(thirdPartyClusterService.save(clusterInfo, null));
+ public Response<Integer> save(@RequestBody ClusterRequest request) {
+ return Response.success(thirdPartyClusterService.save(request, null));
}
@GetMapping(value = "/get/{id}")
@ApiOperation(value = "Get cluster by id")
@ApiImplicitParam(name = "id", value = "common cluster ID", dataTypeClass = Integer.class, required = true)
- public Response<ClusterInfo> get(@PathVariable Integer id) {
+ public Response<ClusterResponse> get(@PathVariable Integer id) {
return Response.success(thirdPartyClusterService.get(id));
}
@PostMapping(value = "/list")
- @ApiOperation(value = "List clusters by condition")
- public Response<List<ClusterInfo>> list(@RequestBody ClusterRequest request) {
+ @ApiOperation(value = "Get clusters by paginating")
+ public Response<PageInfo<ClusterResponse>> list(@RequestBody ClusterPageRequest request) {
return Response.success(thirdPartyClusterService.list(request));
}
@PostMapping(value = "/update")
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update cluster info")
- public Response<Boolean> update(@RequestBody ClusterInfo clusterInfo) {
- return Response.success(thirdPartyClusterService.update(clusterInfo, null));
+ public Response<Boolean> update(@RequestBody ClusterRequest request) {
+ return Response.success(thirdPartyClusterService.update(request, null));
}
@DeleteMapping(value = "/delete/{id}")