You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/12 07:43:56 UTC
[inlong] branch master updated: [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aa91f5984 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
aa91f5984 is described below
commit aa91f5984cf0cd230cba9098834b028a554fc65c
Author: xuesongxs <54...@users.noreply.github.com>
AuthorDate: Wed Oct 12 15:43:49 2022 +0800
[INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
* Add protocol type constant class
* Fix unit test error
* Update NodeEditModal.tsx
Co-authored-by: healchow <he...@gmail.com>
Co-authored-by: Daniel <le...@apache.org>
---
.../common/heartbeat/ComponentHeartbeat.java | 5 +-
.../inlong/common/heartbeat/HeartbeatMsg.java | 7 ++-
.../common/pojo/dataproxy/DataProxyNodeInfo.java | 4 ++
inlong-dashboard/src/locales/cn.json | 2 +
inlong-dashboard/src/locales/en.json | 2 +
.../src/pages/Clusters/NodeEditModal.tsx | 19 ++++++++
inlong-dashboard/src/pages/Clusters/NodeManage.tsx | 4 ++
.../inlong/manager/common/consts/ProtocolType.java | 26 +++-------
.../dao/entity/InlongClusterNodeEntity.java | 1 +
.../dao/mapper/InlongClusterNodeEntityMapper.java | 3 +-
.../mappers/InlongClusterNodeEntityMapper.xml | 57 ++++++++++++----------
.../manager/pojo/cluster/ClusterNodeRequest.java | 4 ++
.../manager/pojo/cluster/ClusterNodeResponse.java | 3 ++
.../service/cluster/InlongClusterService.java | 5 +-
.../service/cluster/InlongClusterServiceImpl.java | 30 +++++-------
.../service/core/heartbeat/HeartbeatManager.java | 2 +
.../service/cluster/InlongClusterServiceTest.java | 46 +++++++++++------
.../core/heartbeat/HeartbeatManagerTest.java | 3 ++
.../service/core/impl/HeartbeatServiceTest.java | 2 +
.../main/resources/h2/apache_inlong_manager.sql | 31 ++++++------
.../manager-web/sql/apache_inlong_manager.sql | 31 ++++++------
.../controller/openapi/DataProxyController.java | 6 ++-
22 files changed, 181 insertions(+), 112 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 260e4618c..8d6beda08 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -35,18 +35,21 @@ public class ComponentHeartbeat {
private int port;
+ private String protocolType;
+
private String inCharges;
public ComponentHeartbeat() {
}
public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, int port,
- String inCharges) {
+ String inCharges, String protocolType) {
this.clusterTag = clusterTag;
this.clusterName = clusterName;
this.componentType = componentType;
this.ip = ip;
this.port = port;
+ this.protocolType = protocolType;
this.inCharges = inCharges;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 5277eaf22..42038d993 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -43,6 +43,11 @@ public class HeartbeatMsg {
*/
private int port;
+ /**
+ * ProtocolType of component
+ */
+ private String protocolType;
+
/**
* Type of component
*/
@@ -79,6 +84,6 @@ public class HeartbeatMsg {
private List<StreamHeartbeat> streamHeartbeats;
public ComponentHeartbeat componentHeartbeat() {
- return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges);
+ return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType);
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
index 6425348a8..9aca22240 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
@@ -40,4 +40,8 @@ public class DataProxyNodeInfo {
*/
private Integer port;
+ /**
+ * Node protocol type
+ */
+ private String protocolType;
}
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 16e5161f7..feeb8b4d6 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -432,10 +432,12 @@
"pages.Clusters.Description": "集群描述",
"pages.Clusters.Node.Name": "节点",
"pages.Clusters.Node.Port": "端口",
+ "pages.Clusters.Node.ProtocolType": "协议类型",
"pages.Clusters.Node.LastModifier": "最后操作",
"pages.Clusters.Node.Create": "新建节点",
"pages.Clusters.Node.IpRule": "请输入正确的IP地址",
"pages.Clusters.Node.PortRule": "请输入正确的端口",
+ "pages.Clusters.Node.ProtocolTypeRule": "请输入正确的协议类型",
"pages.Clusters.Pulsar.Tenant": "默认租户",
"pages.Clusters.Pulsar.ServiceUrlHelper": "用于生产和消费数据",
"pages.Clusters.Pulsar.AdminUrlHelper": "用于管理(如:创建、修改)租户、命名空间、Topic 和订阅组",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 691e8f98d..e9da7052a 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -433,10 +433,12 @@
"pages.Clusters.Description": "Description",
"pages.Clusters.Node.Name": "Node",
"pages.Clusters.Node.Port": "Port",
+ "pages.Clusters.Node.ProtocolType": "Protocol Type",
"pages.Clusters.Node.LastModifier": "LastModifier",
"pages.Clusters.Node.Create": "Create",
"pages.Clusters.Node.IpRule": "Please enter the IP address correctly",
"pages.Clusters.Node.PortRule": "Please enter the port address correctly",
+ "pages.Clusters.Node.ProtocolTypeRule": "Please enter the protocol type correctly",
"pages.Clusters.Pulsar.Tenant": "Default Tenant",
"pages.Clusters.Pulsar.ServiceUrlHelper": "For producing and consuming data",
"pages.Clusters.Pulsar.AdminUrlHelper": "Used to manage (e.g. create, modify) tenants, namespaces, topics and subscription groups",
diff --git a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
index 167330348..a085ec0c0 100644
--- a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
+++ b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
@@ -106,6 +106,25 @@ const NodeEditModal: React.FC<NodeEditModalProps> = ({ id, type, clusterId, ...m
max: 65535,
},
},
+ {
+ type: 'select',
+ label: i18n.t('pages.Clusters.Node.ProtocolType'),
+ name: 'protocolType',
+ initialValue: 'HTTP',
+ rules: [{ required: true }],
+ props: {
+ options: [
+ {
+ label: 'HTTP',
+ value: 'HTTP',
+ },
+ {
+ label: 'TCP',
+ value: 'TCP',
+ },
+ ],
+ },
+ },
{
type: 'textarea',
label: i18n.t('pages.Clusters.Description'),
diff --git a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
index 393cf41a1..9368f9128 100644
--- a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
+++ b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
@@ -125,6 +125,10 @@ const Comp: React.FC = () => {
title: i18n.t('pages.Clusters.Node.Port'),
dataIndex: 'port',
},
+ {
+ title: i18n.t('pages.Clusters.Node.ProtocolType'),
+ dataIndex: 'protocolType',
+ },
{
title: i18n.t('pages.Clusters.Node.LastModifier'),
dataIndex: 'modifier',
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
index 6425348a8..8a761fc8b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
@@ -15,29 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.dataproxy;
-
-import lombok.Data;
+package org.apache.inlong.manager.common.consts;
/**
- * Data proxy node info.
+ * Constants of protocol type.
*/
-@Data
-public class DataProxyNodeInfo {
-
- /**
- * DataProxy node id
- */
- private Integer id;
+public class ProtocolType {
- /**
- * Node IP
- */
- private String ip;
+ public static final String TCP = "TCP";
+ public static final String UDP = "UDP";
- /**
- * Node port
- */
- private Integer port;
+ public static final String HTTP = "HTTP";
+ public static final String HTTPS = "HTTPS";
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
index 56fdef9e9..a201cbc60 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
@@ -34,6 +34,7 @@ public class InlongClusterNodeEntity implements Serializable {
private String type;
private String ip;
private Integer port;
+ private String protocolType;
private String extParams;
private String description;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 29ea012d5..dabc837dd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -38,7 +38,8 @@ public interface InlongClusterNodeEntityMapper {
List<InlongClusterNodeEntity> selectByCondition(ClusterPageRequest request);
- List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId);
+ List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId,
+ @Param("protocolType") String protocolType);
int updateById(InlongClusterNodeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index c184eab02..294e08845 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -26,6 +26,7 @@
<result column="type" jdbcType="VARCHAR" property="type"/>
<result column="ip" jdbcType="VARCHAR" property="ip"/>
<result column="port" jdbcType="INTEGER" property="port"/>
+ <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="description" jdbcType="VARCHAR" property="description"/>
<result column="status" jdbcType="INTEGER" property="status"/>
@@ -37,30 +38,32 @@
<result column="version" jdbcType="INTEGER" property="version"/>
</resultMap>
<sql id="Base_Column_List">
- id, parent_id, type, ip, port, ext_params, description, status, is_deleted,
+ id, parent_id, type, ip, port, protocol_type, ext_params, description, status, is_deleted,
creator, modifier, create_time, modify_time, version
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
- ip, port, ext_params,
- description, status,
+ ip, port, protocol_type,
+ ext_params, description, status,
creator, modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
- #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
- #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
+ #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
+ #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>
<insert id="insertOnDuplicateKeyUpdate" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
- ip, port, ext_params,
- status, creator, modifier)
+ ip, port, protocol_type,
+ ext_params, status, creator,
+ modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
- #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
- #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
+ #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
+ #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+ #{modifier,jdbcType=VARCHAR})
ON DUPLICATE KEY UPDATE ext_params = VALUES(ext_params),
status = VALUES(status),
modifier = VALUES(modifier)
@@ -83,6 +86,7 @@
and type = #{type, jdbcType=VARCHAR}
and ip = #{ip, jdbcType=VARCHAR}
and port = #{port, jdbcType=INTEGER}
+ and protocol_type = #{protocolType, jdbcType=VARCHAR}
</select>
<select id="selectByCondition"
parameterType="org.apache.inlong.manager.pojo.cluster.ClusterPageRequest"
@@ -95,12 +99,9 @@
<if test="type != null and type != ''">
and type = #{type, jdbcType=VARCHAR}
</if>
- <if test="parentId != null">
+ <if test="parentId != null and parentId != ''">
and parent_id = #{parentId, jdbcType=INTEGER}
</if>
- <if test="status != null">
- and status = #{status, jdbcType=INTEGER}
- </if>
<if test="keyword != null and keyword != ''">
and (
ip like CONCAT('%', #{keyword}, '%')
@@ -114,22 +115,28 @@
select
<include refid="Base_Column_List"/>
from inlong_cluster_node
- where is_deleted = 0
- and parent_id = #{parentId, jdbcType=INTEGER}
+ <where>
+ is_deleted = 0
+ and parent_id = #{parentId, jdbcType=INTEGER}
+ <if test="protocolType != null and protocolType != ''">
+ and protocol_type = #{protocolType, jdbcType=VARCHAR}
+ </if>
+ </where>
</select>
<update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
update inlong_cluster_node
- set parent_id = #{parentId,jdbcType=INTEGER},
- type = #{type,jdbcType=VARCHAR},
- ip = #{ip,jdbcType=VARCHAR},
- port = #{port,jdbcType=INTEGER},
- ext_params = #{extParams,jdbcType=LONGVARCHAR},
- description = #{description,jdbcType=VARCHAR},
- status = #{status,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- modifier = #{modifier,jdbcType=VARCHAR},
- version = #{version,jdbcType=INTEGER} + 1
+ set parent_id = #{parentId,jdbcType=INTEGER},
+ type = #{type,jdbcType=VARCHAR},
+ ip = #{ip,jdbcType=VARCHAR},
+ port = #{port,jdbcType=INTEGER},
+ protocol_type = #{protocolType,jdbcType=VARCHAR},
+ ext_params = #{extParams,jdbcType=LONGVARCHAR},
+ description = #{description,jdbcType=VARCHAR},
+ status = #{status,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ version = #{version,jdbcType=INTEGER} + 1
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 31cc74458..384f8056d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -52,6 +52,10 @@ public class ClusterNodeRequest {
@ApiModelProperty(value = "Cluster port")
private Integer port;
+ @NotBlank(message = "protocolType cannot be blank")
+ @ApiModelProperty(value = "Cluster protocol type")
+ private String protocolType;
+
@ApiModelProperty(value = "Extended params")
private String extParams;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
index ee45d9642..cdd426178 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
@@ -52,6 +52,9 @@ public class ClusterNodeResponse {
@ApiModelProperty(value = "Cluster port")
private Integer port;
+ @ApiModelProperty(value = "Cluster protocol type")
+ private String protocolType;
+
@ApiModelProperty(value = "Extended params")
private String extParams;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 244ee8372..11ca3120f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -199,12 +199,13 @@ public interface InlongClusterService {
Boolean deleteNode(Integer id, String operator);
/**
- * Query data proxy nodes by the given inlong group id.
+ * Query data proxy nodes by the given inlong group id and protocol type
*
* @param inlongGroupId inlong group id
+ * @param protocolType protocol type
* @return data proxy node response
*/
- DataProxyNodeResponse getDataProxyNodes(String inlongGroupId);
+ DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType);
/**
* Get the configuration of DataProxy through the cluster name to which DataProxy belongs.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index e175f8c00..90ba7c1fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -440,7 +439,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
"Current user does not have permission to delete cluster info");
- List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id);
+ List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id, null);
if (CollectionUtils.isNotEmpty(nodeEntities)) {
String errMsg = String.format("there are undeleted nodes under the cluster [%s], "
+ "please delete the node first", entity.getName());
@@ -533,7 +532,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
request.getType());
List<InlongClusterNodeEntity> allNodeList = new ArrayList<>();
for (InlongClusterEntity cluster : clusterList) {
- List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId());
+ List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId(), null);
allNodeList.addAll(nodeList);
}
return CommonBeanUtils.copyListProperties(allNodeList, ClusterNodeResponse::new);
@@ -568,8 +567,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
// check cluster node if exist
InlongClusterNodeEntity exist = clusterNodeMapper.selectByUniqueKey(request);
if (exist != null && !Objects.equals(id, exist.getId())) {
- String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s",
- request.getType(), request.getIp(), request.getPort());
+ String errMsg = "inlong cluster node already exist for " + request;
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
@@ -579,8 +577,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
LOGGER.error("cluster node not found by id={}", id);
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
}
- String errMsg = String.format("cluster node has already updated with parentId=%s, type=%s, ip=%s, port=%s",
- request.getParentId(), request.getType(), request.getIp(), request.getPort());
+ String errMsg = "cluster node has already updated for " + request;
if (!Objects.equals(entity.getVersion(), request.getVersion())) {
LOGGER.warn(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
@@ -617,8 +614,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
entity.setIsDeleted(entity.getId());
entity.setModifier(operator);
if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateById(entity)) {
- LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}",
- entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort());
+ LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}, protocolType={}",
+ entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort(), entity.getProtocolType());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
LOGGER.info("success to delete inlong cluster node by id={}", id);
@@ -626,8 +623,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
}
@Override
- public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId) {
- LOGGER.debug("begin to get data proxy nodes for inlongGroupId={}", inlongGroupId);
+ public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType) {
+ LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", inlongGroupId, protocolType);
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(inlongGroupId);
if (groupEntity == null) {
String msg = "inlong group not exists for inlongGroupId=" + inlongGroupId;
@@ -650,19 +647,17 @@ public class InlongClusterServiceImpl implements InlongClusterService {
throw new BusinessException(msg);
}
+ // if more than one data proxy cluster, currently takes first
// TODO consider the data proxy load and re-balance
List<DataProxyNodeInfo> nodeInfos = new ArrayList<>();
for (InlongClusterEntity entity : clusterList) {
- ClusterPageRequest request = ClusterPageRequest.builder()
- .parentId(entity.getId())
- .status(NodeStatus.NORMAL.getStatus())
- .build();
- List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByCondition(request);
+ List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(entity.getId(), protocolType);
for (InlongClusterNodeEntity nodeEntity : nodeList) {
DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
nodeInfo.setId(nodeEntity.getId());
nodeInfo.setIp(nodeEntity.getIp());
nodeInfo.setPort(nodeEntity.getPort());
+ nodeInfo.setProtocolType(nodeEntity.getProtocolType());
nodeInfos.add(nodeInfo);
}
}
@@ -672,7 +667,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
response.setNodeList(nodeInfos);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("success to get data proxy nodes for inlongGroupId={}, result={}", inlongGroupId, response);
+ LOGGER.debug("success to get data proxy nodes for groupId={}, protocol={} result={}",
+ inlongGroupId, protocolType, response);
}
return response;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 6027fa908..6ae1f8060 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -131,6 +131,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
nodeRequest.setType(heartbeat.getComponentType());
nodeRequest.setIp(heartbeat.getIp());
nodeRequest.setPort(heartbeat.getPort());
+ nodeRequest.setProtocolType(heartbeat.getProtocolType());
return clusterNodeMapper.selectByUniqueKey(nodeRequest);
}
@@ -140,6 +141,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
clusterNode.setType(heartbeat.getComponentType());
clusterNode.setIp(heartbeat.getIp());
clusterNode.setPort(heartbeat.getPort());
+ clusterNode.setProtocolType(heartbeat.getProtocolType());
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
clusterNode.setCreator(creator);
clusterNode.setModifier(creator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 23b2326ce..baba6d294 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.consts.ProtocolType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
@@ -36,6 +37,8 @@ import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Comparator;
@@ -46,6 +49,7 @@ import java.util.List;
*/
public class InlongClusterServiceTest extends ServiceBaseTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceTest.class);
@Autowired
private InlongClusterService clusterService;
@Autowired
@@ -112,12 +116,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
/**
* Save cluster node info.
*/
- public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port) {
+ public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port, String protocolType) {
ClusterNodeRequest request = new ClusterNodeRequest();
request.setParentId(parentId);
request.setType(type);
request.setIp(ip);
request.setPort(port);
+ request.setProtocolType(protocolType);
return clusterService.saveNode(request, GLOBAL_OPERATOR);
}
@@ -149,6 +154,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(ip);
heartbeatMsg.setPort(port);
+ heartbeatMsg.setProtocolType(ProtocolType.HTTP);
heartbeatMsg.setComponentType(type);
heartbeatMsg.setReportTime(System.currentTimeMillis());
heartbeatMsg.setClusterName(clusterName);
@@ -193,7 +199,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Integer parentId = id;
String ip = "127.0.0.1";
Integer port = 8080;
- Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port);
+ Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port, ProtocolType.HTTP);
Assertions.assertNotNull(nodeId);
// list cluster node
@@ -217,7 +223,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
}
@Test
- public void testGetDataProxyIp() throws InterruptedException {
+ public void testGetDataProxyIp() {
String clusterTag = "default_cluster";
String clusterName = "test_data_proxy";
String extTag = "ext_1";
@@ -229,11 +235,11 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
// save cluster node
String ip = "127.0.0.1";
Integer port1 = 46800;
- Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1);
+ Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1, ProtocolType.TCP);
Assertions.assertNotNull(nodeId1);
Integer port2 = 46801;
- Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2);
+ Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP);
Assertions.assertNotNull(nodeId2);
// report heartbeat
@@ -250,15 +256,27 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);
// get the data proxy nodes, the first port should is p1, second port is p2
- DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId);
- List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList();
- ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
- Assertions.assertEquals(ipList.size(), 2);
- Assertions.assertEquals(port1, ipList.get(0).getPort());
- Assertions.assertEquals(port2, ipList.get(1).getPort());
-
- this.deleteClusterNode(nodeId1);
- this.deleteClusterNode(nodeId2);
+ DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
+ List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
+ nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
+ Assertions.assertEquals(nodeInfoList.size(), 2);
+ Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
+ Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
+
+ nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP);
+ nodeInfoList = nodeResponse.getNodeList();
+ nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
+ Assertions.assertEquals(nodeInfoList.size(), 2);
+ Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
+ Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
+
+ // delete all cluster nodes
+ // TODO should query by cluster parent id
+ nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, null);
+ for (DataProxyNodeInfo nodeInfo : nodeResponse.getNodeList()) {
+ this.deleteClusterNode(nodeInfo.getId());
+ }
+
this.deleteCluster(id);
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
index 84fef8e41..f6082528c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core.heartbeat;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.manager.common.consts.ProtocolType;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -63,6 +64,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
nodeRequest.setType(msg.getComponentType());
nodeRequest.setIp(msg.getIp());
nodeRequest.setPort(msg.getPort());
+ nodeRequest.setProtocolType(ProtocolType.HTTP);
InlongClusterNodeEntity clusterNode = clusterNodeMapper.selectByUniqueKey(nodeRequest);
Assertions.assertNotNull(clusterNode);
Assertions.assertEquals((int) clusterNode.getStatus(), NodeStatus.NORMAL.getStatus());
@@ -83,6 +85,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp("127.0.0.1");
heartbeatMsg.setPort(8008);
+ heartbeatMsg.setProtocolType(ProtocolType.HTTP);
heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getName());
heartbeatMsg.setReportTime(System.currentTimeMillis());
return heartbeatMsg;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
index f7791c43c..76b331b8c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.consts.ProtocolType;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
@@ -53,6 +54,7 @@ public class HeartbeatServiceTest extends ServiceBaseTest {
request.setComponentType(ComponentTypeEnum.Agent.getName());
request.setIp("127.0.0.1");
request.setReportTime(Instant.now().toEpochMilli());
+ request.setProtocolType(ProtocolType.HTTP);
List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 760d93316..eaba39763 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -122,22 +122,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
-- ----------------------------
CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
- `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
- `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
- `port` int(6) NULL COMMENT 'Cluster port',
- `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
- `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
+ `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
+ `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+ `port` int(6) NULL COMMENT 'Cluster port',
+ `protocol_type` varchar(20) NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+ `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+ `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 825e6841e..699c3a1be 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -132,22 +132,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
-- ----------------------------
CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
- `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
- `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
- `port` int(6) NULL COMMENT 'Cluster port',
- `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
- `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
+ `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
+ `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+ `port` int(6) NULL COMMENT 'Cluster port',
+ `protocol_type` varchar(20) NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+ `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+ `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 8569b3e5d..25ed2969d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -50,10 +50,12 @@ public class DataProxyController {
@Autowired
private DataProxyConfigRepository dataProxyConfigRepository;
+ // TODO protocol type must be provided by the DataProxy
@PostMapping(value = "/dataproxy/getIpList/{inlongGroupId}")
@ApiOperation(value = "Get data proxy IP list by InlongGroupId")
- public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId) {
- return Response.success(clusterService.getDataProxyNodes(inlongGroupId));
+ public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId,
+ @RequestParam(required = false) String protocolType) {
+ return Response.success(clusterService.getDataProxyNodes(inlongGroupId, protocolType));
}
@PostMapping("/dataproxy/getConfig")