You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/12 07:43:56 UTC

[inlong] branch master updated: [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aa91f5984 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
aa91f5984 is described below

commit aa91f5984cf0cd230cba9098834b028a554fc65c
Author: xuesongxs <54...@users.noreply.github.com>
AuthorDate: Wed Oct 12 15:43:49 2022 +0800

    [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
    
    * Add protocol type constant class
    
    * Fix unit test error
    
    * Update NodeEditModal.tsx
    
    Co-authored-by: healchow <he...@gmail.com>
    Co-authored-by: Daniel <le...@apache.org>
---
 .../common/heartbeat/ComponentHeartbeat.java       |  5 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java      |  7 ++-
 .../common/pojo/dataproxy/DataProxyNodeInfo.java   |  4 ++
 inlong-dashboard/src/locales/cn.json               |  2 +
 inlong-dashboard/src/locales/en.json               |  2 +
 .../src/pages/Clusters/NodeEditModal.tsx           | 19 ++++++++
 inlong-dashboard/src/pages/Clusters/NodeManage.tsx |  4 ++
 .../inlong/manager/common/consts/ProtocolType.java | 26 +++-------
 .../dao/entity/InlongClusterNodeEntity.java        |  1 +
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  3 +-
 .../mappers/InlongClusterNodeEntityMapper.xml      | 57 ++++++++++++----------
 .../manager/pojo/cluster/ClusterNodeRequest.java   |  4 ++
 .../manager/pojo/cluster/ClusterNodeResponse.java  |  3 ++
 .../service/cluster/InlongClusterService.java      |  5 +-
 .../service/cluster/InlongClusterServiceImpl.java  | 30 +++++-------
 .../service/core/heartbeat/HeartbeatManager.java   |  2 +
 .../service/cluster/InlongClusterServiceTest.java  | 46 +++++++++++------
 .../core/heartbeat/HeartbeatManagerTest.java       |  3 ++
 .../service/core/impl/HeartbeatServiceTest.java    |  2 +
 .../main/resources/h2/apache_inlong_manager.sql    | 31 ++++++------
 .../manager-web/sql/apache_inlong_manager.sql      | 31 ++++++------
 .../controller/openapi/DataProxyController.java    |  6 ++-
 22 files changed, 181 insertions(+), 112 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 260e4618c..8d6beda08 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -35,18 +35,21 @@ public class ComponentHeartbeat {
 
     private int port;
 
+    private String protocolType;
+
     private String inCharges;
 
     public ComponentHeartbeat() {
     }
 
     public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, int port,
-            String inCharges) {
+            String inCharges, String protocolType) {
         this.clusterTag = clusterTag;
         this.clusterName = clusterName;
         this.componentType = componentType;
         this.ip = ip;
         this.port = port;
+        this.protocolType = protocolType;
         this.inCharges = inCharges;
     }
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 5277eaf22..42038d993 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -43,6 +43,11 @@ public class HeartbeatMsg {
      */
     private int port;
 
+    /**
+     * ProtocolType of component
+     */
+    private String protocolType;
+
     /**
      * Type of component
      */
@@ -79,6 +84,6 @@ public class HeartbeatMsg {
     private List<StreamHeartbeat> streamHeartbeats;
 
     public ComponentHeartbeat componentHeartbeat() {
-        return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges);
+        return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType);
     }
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
index 6425348a8..9aca22240 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
@@ -40,4 +40,8 @@ public class DataProxyNodeInfo {
      */
     private Integer port;
 
+    /**
+     * Node protocol type
+     */
+    private String protocolType;
 }
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 16e5161f7..feeb8b4d6 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -432,10 +432,12 @@
   "pages.Clusters.Description": "集群描述",
   "pages.Clusters.Node.Name": "节点",
   "pages.Clusters.Node.Port": "端口",
+  "pages.Clusters.Node.ProtocolType": "协议类型",
   "pages.Clusters.Node.LastModifier": "最后操作",
   "pages.Clusters.Node.Create": "新建节点",
   "pages.Clusters.Node.IpRule": "请输入正确的IP地址",
   "pages.Clusters.Node.PortRule": "请输入正确的端口",
+  "pages.Clusters.Node.ProtocolTypeRule": "请输入正确的协议类型",
   "pages.Clusters.Pulsar.Tenant": "默认租户",
   "pages.Clusters.Pulsar.ServiceUrlHelper": "用于生产和消费数据",
   "pages.Clusters.Pulsar.AdminUrlHelper": "用于管理(如:创建、修改)租户、命名空间、Topic 和订阅组",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 691e8f98d..e9da7052a 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -433,10 +433,12 @@
   "pages.Clusters.Description": "Description",
   "pages.Clusters.Node.Name": "Node",
   "pages.Clusters.Node.Port": "Port",
+  "pages.Clusters.Node.ProtocolType": "Protocol Type",
   "pages.Clusters.Node.LastModifier": "LastModifier",
   "pages.Clusters.Node.Create": "Create",
   "pages.Clusters.Node.IpRule": "Please enter the IP address correctly",
   "pages.Clusters.Node.PortRule": "Please enter the port address correctly",
+  "pages.Clusters.Node.ProtocolTypeRule": "Please enter the protocol type correctly",
   "pages.Clusters.Pulsar.Tenant": "Default Tenant",
   "pages.Clusters.Pulsar.ServiceUrlHelper": "For producing and consuming data",
   "pages.Clusters.Pulsar.AdminUrlHelper": "Used to manage (e.g. create, modify) tenants, namespaces, topics and subscription groups",
diff --git a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
index 167330348..a085ec0c0 100644
--- a/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
+++ b/inlong-dashboard/src/pages/Clusters/NodeEditModal.tsx
@@ -106,6 +106,25 @@ const NodeEditModal: React.FC<NodeEditModalProps> = ({ id, type, clusterId, ...m
           max: 65535,
         },
       },
+      {
+        type: 'select',
+        label: i18n.t('pages.Clusters.Node.ProtocolType'),
+        name: 'protocolType',
+        initialValue: 'HTTP',
+        rules: [{ required: true }],
+        props: {
+          options: [
+            {
+              label: 'HTTP',
+              value: 'HTTP',
+            },
+            {
+              label: 'TCP',
+              value: 'TCP',
+            },
+          ],
+        },
+      },
       {
         type: 'textarea',
         label: i18n.t('pages.Clusters.Description'),
diff --git a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
index 393cf41a1..9368f9128 100644
--- a/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
+++ b/inlong-dashboard/src/pages/Clusters/NodeManage.tsx
@@ -125,6 +125,10 @@ const Comp: React.FC = () => {
         title: i18n.t('pages.Clusters.Node.Port'),
         dataIndex: 'port',
       },
+      {
+        title: i18n.t('pages.Clusters.Node.ProtocolType'),
+        dataIndex: 'protocolType',
+      },
       {
         title: i18n.t('pages.Clusters.Node.LastModifier'),
         dataIndex: 'modifier',
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
index 6425348a8..8a761fc8b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/ProtocolType.java
@@ -15,29 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.dataproxy;
-
-import lombok.Data;
+package org.apache.inlong.manager.common.consts;
 
 /**
- * Data proxy node info.
+ * Constants of protocol type.
  */
-@Data
-public class DataProxyNodeInfo {
-
-    /**
-     * DataProxy node id
-     */
-    private Integer id;
+public class ProtocolType {
 
-    /**
-     * Node IP
-     */
-    private String ip;
+    public static final String TCP = "TCP";
+    public static final String UDP = "UDP";
 
-    /**
-     * Node port
-     */
-    private Integer port;
+    public static final String HTTP = "HTTP";
+    public static final String HTTPS = "HTTPS";
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
index 56fdef9e9..a201cbc60 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
@@ -34,6 +34,7 @@ public class InlongClusterNodeEntity implements Serializable {
     private String type;
     private String ip;
     private Integer port;
+    private String protocolType;
     private String extParams;
     private String description;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 29ea012d5..dabc837dd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -38,7 +38,8 @@ public interface InlongClusterNodeEntityMapper {
 
     List<InlongClusterNodeEntity> selectByCondition(ClusterPageRequest request);
 
-    List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId);
+    List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId,
+            @Param("protocolType") String protocolType);
 
     int updateById(InlongClusterNodeEntity record);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index c184eab02..294e08845 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -26,6 +26,7 @@
         <result column="type" jdbcType="VARCHAR" property="type"/>
         <result column="ip" jdbcType="VARCHAR" property="ip"/>
         <result column="port" jdbcType="INTEGER" property="port"/>
+        <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="description" jdbcType="VARCHAR" property="description"/>
         <result column="status" jdbcType="INTEGER" property="status"/>
@@ -37,30 +38,32 @@
         <result column="version" jdbcType="INTEGER" property="version"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, parent_id, type, ip, port, ext_params, description, status, is_deleted,
+        id, parent_id, type, ip, port, protocol_type, ext_params, description, status, is_deleted,
         creator, modifier, create_time, modify_time, version
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
-                                         ip, port, ext_params,
-                                         description, status,
+                                         ip, port, protocol_type,
+                                         ext_params, description, status,
                                          creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
-                #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
-                #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
+                #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
+                #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
     </insert>
 
     <insert id="insertOnDuplicateKeyUpdate" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
-                                         ip, port, ext_params,
-                                         status, creator, modifier)
+                                         ip, port, protocol_type,
+                                         ext_params, status, creator,
+                                         modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
-                #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
-                #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
+                #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
+                #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+                #{modifier,jdbcType=VARCHAR})
         ON DUPLICATE KEY UPDATE ext_params = VALUES(ext_params),
                                 status     = VALUES(status),
                                 modifier   = VALUES(modifier)
@@ -83,6 +86,7 @@
         and type = #{type, jdbcType=VARCHAR}
         and ip = #{ip, jdbcType=VARCHAR}
         and port = #{port, jdbcType=INTEGER}
+        and protocol_type = #{protocolType, jdbcType=VARCHAR}
     </select>
     <select id="selectByCondition"
             parameterType="org.apache.inlong.manager.pojo.cluster.ClusterPageRequest"
@@ -95,12 +99,9 @@
             <if test="type != null and type != ''">
                 and type = #{type, jdbcType=VARCHAR}
             </if>
-            <if test="parentId != null">
+            <if test="parentId != null and parentId != ''">
                 and parent_id = #{parentId, jdbcType=INTEGER}
             </if>
-            <if test="status != null">
-                and status = #{status, jdbcType=INTEGER}
-            </if>
             <if test="keyword != null and keyword != ''">
                 and (
                 ip like CONCAT('%', #{keyword}, '%')
@@ -114,22 +115,28 @@
         select
         <include refid="Base_Column_List"/>
         from inlong_cluster_node
-        where is_deleted = 0
-        and parent_id = #{parentId, jdbcType=INTEGER}
+        <where>
+            is_deleted = 0
+            and parent_id = #{parentId, jdbcType=INTEGER}
+            <if test="protocolType != null and protocolType != ''">
+                and protocol_type = #{protocolType, jdbcType=VARCHAR}
+            </if>
+        </where>
     </select>
 
     <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         update inlong_cluster_node
-        set parent_id   = #{parentId,jdbcType=INTEGER},
-            type        = #{type,jdbcType=VARCHAR},
-            ip          = #{ip,jdbcType=VARCHAR},
-            port        = #{port,jdbcType=INTEGER},
-            ext_params  = #{extParams,jdbcType=LONGVARCHAR},
-            description = #{description,jdbcType=VARCHAR},
-            status      = #{status,jdbcType=INTEGER},
-            is_deleted  = #{isDeleted,jdbcType=INTEGER},
-            modifier    = #{modifier,jdbcType=VARCHAR},
-            version     = #{version,jdbcType=INTEGER} + 1
+        set parent_id     = #{parentId,jdbcType=INTEGER},
+            type          = #{type,jdbcType=VARCHAR},
+            ip            = #{ip,jdbcType=VARCHAR},
+            port          = #{port,jdbcType=INTEGER},
+            protocol_type = #{protocolType,jdbcType=VARCHAR},
+            ext_params    = #{extParams,jdbcType=LONGVARCHAR},
+            description   = #{description,jdbcType=VARCHAR},
+            status        = #{status,jdbcType=INTEGER},
+            is_deleted    = #{isDeleted,jdbcType=INTEGER},
+            modifier      = #{modifier,jdbcType=VARCHAR},
+            version       = #{version,jdbcType=INTEGER} + 1
         where id = #{id,jdbcType=INTEGER}
           and version = #{version,jdbcType=INTEGER}
     </update>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 31cc74458..384f8056d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -52,6 +52,10 @@ public class ClusterNodeRequest {
     @ApiModelProperty(value = "Cluster port")
     private Integer port;
 
+    @NotBlank(message = "protocolType cannot be blank")
+    @ApiModelProperty(value = "Cluster protocol type")
+    private String protocolType;
+
     @ApiModelProperty(value = "Extended params")
     private String extParams;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
index ee45d9642..cdd426178 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java
@@ -52,6 +52,9 @@ public class ClusterNodeResponse {
     @ApiModelProperty(value = "Cluster port")
     private Integer port;
 
+    @ApiModelProperty(value = "Cluster protocol type")
+    private String protocolType;
+
     @ApiModelProperty(value = "Extended params")
     private String extParams;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 244ee8372..11ca3120f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -199,12 +199,13 @@ public interface InlongClusterService {
     Boolean deleteNode(Integer id, String operator);
 
     /**
-     * Query data proxy nodes by the given inlong group id.
+     * Query data proxy nodes by the given inlong group id and protocol type
      *
      * @param inlongGroupId inlong group id
+     * @param protocolType protocol type
      * @return data proxy node response
      */
-    DataProxyNodeResponse getDataProxyNodes(String inlongGroupId);
+    DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType);
 
     /**
      * Get the configuration of DataProxy through the cluster name to which DataProxy belongs.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index e175f8c00..90ba7c1fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -440,7 +439,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
                 "Current user does not have permission to delete cluster info");
 
-        List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id);
+        List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(id, null);
         if (CollectionUtils.isNotEmpty(nodeEntities)) {
             String errMsg = String.format("there are undeleted nodes under the cluster [%s], "
                     + "please delete the node first", entity.getName());
@@ -533,7 +532,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
                 request.getType());
         List<InlongClusterNodeEntity> allNodeList = new ArrayList<>();
         for (InlongClusterEntity cluster : clusterList) {
-            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId());
+            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(cluster.getId(), null);
             allNodeList.addAll(nodeList);
         }
         return CommonBeanUtils.copyListProperties(allNodeList, ClusterNodeResponse::new);
@@ -568,8 +567,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         // check cluster node if exist
         InlongClusterNodeEntity exist = clusterNodeMapper.selectByUniqueKey(request);
         if (exist != null && !Objects.equals(id, exist.getId())) {
-            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s",
-                    request.getType(), request.getIp(), request.getPort());
+            String errMsg = "inlong cluster node already exist for " + request;
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
@@ -579,8 +577,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
             LOGGER.error("cluster node not found by id={}", id);
             throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
         }
-        String errMsg = String.format("cluster node has already updated with parentId=%s, type=%s, ip=%s, port=%s",
-                request.getParentId(), request.getType(), request.getIp(), request.getPort());
+        String errMsg = "cluster node has already updated for " + request;
         if (!Objects.equals(entity.getVersion(), request.getVersion())) {
             LOGGER.warn(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
@@ -617,8 +614,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         entity.setIsDeleted(entity.getId());
         entity.setModifier(operator);
         if (InlongConstants.AFFECTED_ONE_ROW != clusterNodeMapper.updateById(entity)) {
-            LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}",
-                    entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort());
+            LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}, protocolType={}",
+                    entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort(), entity.getProtocolType());
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         LOGGER.info("success to delete inlong cluster node by id={}", id);
@@ -626,8 +623,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     }
 
     @Override
-    public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId) {
-        LOGGER.debug("begin to get data proxy nodes for inlongGroupId={}", inlongGroupId);
+    public DataProxyNodeResponse getDataProxyNodes(String inlongGroupId, String protocolType) {
+        LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", inlongGroupId, protocolType);
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(inlongGroupId);
         if (groupEntity == null) {
             String msg = "inlong group not exists for inlongGroupId=" + inlongGroupId;
@@ -650,19 +647,17 @@ public class InlongClusterServiceImpl implements InlongClusterService {
             throw new BusinessException(msg);
         }
 
+        // if more than one data proxy cluster, currently takes first
         // TODO consider the data proxy load and re-balance
         List<DataProxyNodeInfo> nodeInfos = new ArrayList<>();
         for (InlongClusterEntity entity : clusterList) {
-            ClusterPageRequest request = ClusterPageRequest.builder()
-                    .parentId(entity.getId())
-                    .status(NodeStatus.NORMAL.getStatus())
-                    .build();
-            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByCondition(request);
+            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(entity.getId(), protocolType);
             for (InlongClusterNodeEntity nodeEntity : nodeList) {
                 DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
                 nodeInfo.setId(nodeEntity.getId());
                 nodeInfo.setIp(nodeEntity.getIp());
                 nodeInfo.setPort(nodeEntity.getPort());
+                nodeInfo.setProtocolType(nodeEntity.getProtocolType());
                 nodeInfos.add(nodeInfo);
             }
         }
@@ -672,7 +667,8 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         response.setNodeList(nodeInfos);
 
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("success to get data proxy nodes for inlongGroupId={}, result={}", inlongGroupId, response);
+            LOGGER.debug("success to get data proxy nodes for groupId={}, protocol={} result={}",
+                    inlongGroupId, protocolType, response);
         }
         return response;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 6027fa908..6ae1f8060 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -131,6 +131,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         nodeRequest.setType(heartbeat.getComponentType());
         nodeRequest.setIp(heartbeat.getIp());
         nodeRequest.setPort(heartbeat.getPort());
+        nodeRequest.setProtocolType(heartbeat.getProtocolType());
         return clusterNodeMapper.selectByUniqueKey(nodeRequest);
     }
 
@@ -140,6 +141,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         clusterNode.setType(heartbeat.getComponentType());
         clusterNode.setIp(heartbeat.getIp());
         clusterNode.setPort(heartbeat.getPort());
+        clusterNode.setProtocolType(heartbeat.getProtocolType());
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 23b2326ce..baba6d294 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.consts.ProtocolType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
@@ -36,6 +37,8 @@ import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Comparator;
@@ -46,6 +49,7 @@ import java.util.List;
  */
 public class InlongClusterServiceTest extends ServiceBaseTest {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceTest.class);
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
@@ -112,12 +116,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
     /**
      * Save cluster node info.
      */
-    public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port) {
+    public Integer saveClusterNode(Integer parentId, String type, String ip, Integer port, String protocolType) {
         ClusterNodeRequest request = new ClusterNodeRequest();
         request.setParentId(parentId);
         request.setType(type);
         request.setIp(ip);
         request.setPort(port);
+        request.setProtocolType(protocolType);
         return clusterService.saveNode(request, GLOBAL_OPERATOR);
     }
 
@@ -149,6 +154,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
         heartbeatMsg.setIp(ip);
         heartbeatMsg.setPort(port);
+        heartbeatMsg.setProtocolType(ProtocolType.HTTP);
         heartbeatMsg.setComponentType(type);
         heartbeatMsg.setReportTime(System.currentTimeMillis());
         heartbeatMsg.setClusterName(clusterName);
@@ -193,7 +199,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Integer parentId = id;
         String ip = "127.0.0.1";
         Integer port = 8080;
-        Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port);
+        Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port, ProtocolType.HTTP);
         Assertions.assertNotNull(nodeId);
 
         // list cluster node
@@ -217,7 +223,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
     }
 
     @Test
-    public void testGetDataProxyIp() throws InterruptedException {
+    public void testGetDataProxyIp() {
         String clusterTag = "default_cluster";
         String clusterName = "test_data_proxy";
         String extTag = "ext_1";
@@ -229,11 +235,11 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         // save cluster node
         String ip = "127.0.0.1";
         Integer port1 = 46800;
-        Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1);
+        Integer nodeId1 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port1, ProtocolType.TCP);
         Assertions.assertNotNull(nodeId1);
 
         Integer port2 = 46801;
-        Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2);
+        Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP);
         Assertions.assertNotNull(nodeId2);
 
         // report heartbeat
@@ -250,15 +256,27 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);
 
         // get the data proxy nodes, the first port should is p1, second port is p2
-        DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId);
-        List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList();
-        ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
-        Assertions.assertEquals(ipList.size(), 2);
-        Assertions.assertEquals(port1, ipList.get(0).getPort());
-        Assertions.assertEquals(port2, ipList.get(1).getPort());
-
-        this.deleteClusterNode(nodeId1);
-        this.deleteClusterNode(nodeId2);
+        DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
+        List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
+        nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
+        Assertions.assertEquals(nodeInfoList.size(), 2);
+        Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
+        Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
+
+        nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP);
+        nodeInfoList = nodeResponse.getNodeList();
+        nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
+        Assertions.assertEquals(nodeInfoList.size(), 2);
+        Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
+        Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
+
+        // delete all cluster nodes
+        // TODO should query by cluster parent id
+        nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, null);
+        for (DataProxyNodeInfo nodeInfo : nodeResponse.getNodeList()) {
+            this.deleteClusterNode(nodeInfo.getId());
+        }
+
         this.deleteCluster(id);
     }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
index 84fef8e41..f6082528c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core.heartbeat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.manager.common.consts.ProtocolType;
 import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -63,6 +64,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
         nodeRequest.setType(msg.getComponentType());
         nodeRequest.setIp(msg.getIp());
         nodeRequest.setPort(msg.getPort());
+        nodeRequest.setProtocolType(ProtocolType.HTTP);
         InlongClusterNodeEntity clusterNode = clusterNodeMapper.selectByUniqueKey(nodeRequest);
         Assertions.assertNotNull(clusterNode);
         Assertions.assertEquals((int) clusterNode.getStatus(), NodeStatus.NORMAL.getStatus());
@@ -83,6 +85,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
         HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
         heartbeatMsg.setIp("127.0.0.1");
         heartbeatMsg.setPort(8008);
+        heartbeatMsg.setProtocolType(ProtocolType.HTTP);
         heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getName());
         heartbeatMsg.setReportTime(System.currentTimeMillis());
         return heartbeatMsg;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
index f7791c43c..76b331b8c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.heartbeat.GroupHeartbeat;
 import org.apache.inlong.common.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.consts.ProtocolType;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest;
 import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
@@ -53,6 +54,7 @@ public class HeartbeatServiceTest extends ServiceBaseTest {
         request.setComponentType(ComponentTypeEnum.Agent.getName());
         request.setIp("127.0.0.1");
         request.setReportTime(Instant.now().toEpochMilli());
+        request.setProtocolType(ProtocolType.HTTP);
 
         List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
         GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 760d93316..eaba39763 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -122,22 +122,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
 (
-    `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `parent_id`   int(11)      NOT NULL COMMENT 'Id of the parent cluster',
-    `type`        varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
-    `ip`          varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
-    `port`        int(6)       NULL COMMENT 'Cluster port',
-    `ext_params`  mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
-    `description` varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
-    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `version`     int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+    `id`            int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `parent_id`     int(11)      NOT NULL COMMENT 'Id of the parent cluster',
+    `type`          varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
+    `ip`            varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+    `port`          int(6)       NULL COMMENT 'Cluster port',
+    `protocol_type` varchar(20)  NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+    `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+    `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
+    `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`    int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`       varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`      varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`       int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 825e6841e..699c3a1be 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -132,22 +132,23 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
 (
-    `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `parent_id`   int(11)      NOT NULL COMMENT 'Id of the parent cluster',
-    `type`        varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
-    `ip`          varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
-    `port`        int(6)       NULL COMMENT 'Cluster port',
-    `ext_params`  mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
-    `description` varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
-    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `version`     int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+    `id`            int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `parent_id`     int(11)      NOT NULL COMMENT 'Id of the parent cluster',
+    `type`          varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
+    `ip`            varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+    `port`          int(6)       NULL COMMENT 'Cluster port',
+    `protocol_type` varchar(20)  NOT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+    `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+    `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
+    `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`    int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`       varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`      varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`       int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `protocol_type`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 8569b3e5d..25ed2969d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -50,10 +50,12 @@ public class DataProxyController {
     @Autowired
     private DataProxyConfigRepository dataProxyConfigRepository;
 
+    // TODO protocol type must be provided by the DataProxy
     @PostMapping(value = "/dataproxy/getIpList/{inlongGroupId}")
     @ApiOperation(value = "Get data proxy IP list by InlongGroupId")
-    public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId) {
-        return Response.success(clusterService.getDataProxyNodes(inlongGroupId));
+    public Response<DataProxyNodeResponse> getIpList(@PathVariable String inlongGroupId,
+            @RequestParam(required = false) String protocolType) {
+        return Response.success(clusterService.getDataProxyNodes(inlongGroupId, protocolType));
     }
 
     @PostMapping("/dataproxy/getConfig")