You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 10:35:26 UTC

[inlong] branch master updated: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc7e3b3a [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
bfc7e3b3a is described below

commit bfc7e3b3a13c03878708e2ca995f121593ce3061
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:35:19 2023 +0800

    [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
---
 .../inlong/agent/constant/AgentConstants.java      |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 inlong-agent/conf/agent.properties                 |  2 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java      |  4 +-
 .../inlong/manager/client/api/InlongClient.java    |  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java      | 13 ---
 .../client/api/service/InlongClusterApi.java       |  4 -
 .../manager/common/consts/AgentConstants.java}     | 33 +-------
 .../dao/entity/InlongClusterNodeEntity.java        |  1 -
 .../manager/dao/entity/StreamSourceEntity.java     |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml      | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java}         | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java      | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -------------
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java        | 92 +++++++++++++++++++---
 .../service/heartbeat/HeartbeatManager.java        | 24 +++++-
 .../service/core/impl/AgentServiceTest.java        | 74 ++++++++---------
 .../inlong/manager/service/mocks/MockAgent.java    | 12 +--
 .../main/resources/h2/apache_inlong_manager.sql    |  3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java    |  9 ---
 .../web/controller/openapi/AgentController.java    |  6 ++
 27 files changed, 195 insertions(+), 241 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 2933b907b..eceb0dbf3 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -110,7 +110,7 @@ public class AgentConstants {
     public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
     public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
     public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
-    public static final String AGENT_NODE_TAG = "agent.node.tag";
+    public static final String AGENT_NODE_GROUP = "agent.node.group";
 
     public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port";
     public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 067c92361..9451f8b8d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
 import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
         final String clusterName = conf.get(AGENT_CLUSTER_NAME);
         final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
         final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
-        final String nodeTag = conf.get(AGENT_NODE_TAG);
+        final String nodeGroup = conf.get(AGENT_NODE_GROUP);
 
         HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
         heartbeatMsg.setIp(agentIp);
@@ -227,8 +227,8 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
         if (StringUtils.isNotBlank(inCharges)) {
             heartbeatMsg.setInCharges(inCharges);
         }
-        if (StringUtils.isNotBlank(nodeTag)) {
-            heartbeatMsg.setNodeTag(nodeTag);
+        if (StringUtils.isNotBlank(nodeGroup)) {
+            heartbeatMsg.setNodeGroup(nodeGroup);
         }
 
         Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties
index ee0942a22..e8dfd5911 100755
--- a/inlong-agent/conf/agent.properties
+++ b/inlong-agent/conf/agent.properties
@@ -42,7 +42,7 @@ thread.pool.await.time=30
 agent.local.ip=127.0.0.1
 agent.local.uuid=
 agent.local.uuid.open=false
-agent.node.tag=default_tag
+agent.node.group=default_group
 agent.enable.oom.exit=false
 agent.custom.fixed.ip=
 # max capacity of memory channel
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 31d1eee65..3a54b714f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -76,9 +76,9 @@ public class HeartbeatMsg {
     private String clusterTag;
 
     /**
-      * Tag of node, separated by commas(,)
+      * Group of node for filtering stream source collect task, separated by commas(,)
       */
-    private String nodeTag;
+    private String nodeGroup;
 
     /**
      * Ext tag of cluster, key=value pairs seperated by &
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index a1be17a92..d65660f3e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -261,12 +260,4 @@ public interface InlongClient {
      * @return whether succeed
      */
     Boolean deleteNode(Integer id);
-
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @return whether succeed
-     */
-    Boolean bindNodeTag(ClusterNodeBindTagRequest request);
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 69a538eea..c8f988e44 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -39,7 +39,6 @@ import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -288,11 +287,6 @@ public class InlongClientImpl implements InlongClient {
         return clusterClient.deleteNode(id);
     }
 
-    @Override
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
-        return clusterClient.bindNodeTag(request);
-    }
-
     private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) {
         Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap();
         sources.forEach(source -> {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
index ecf271102..7140bf595 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -23,7 +23,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -297,16 +296,4 @@ public class InlongClusterClient {
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
-
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @return whether succeed
-     */
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
-        Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.bindNodeTag(request));
-        ClientUtils.assertRespSuccess(response);
-        return response.getData();
-    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
index 79b98bf9b..776aacec0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service;
 
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -99,7 +98,4 @@ public interface InlongClusterApi {
 
     @DELETE("cluster/node/delete/{id}")
     Call<Response<Boolean>> deleteNode(@Path("id") Integer id);
-
-    @POST("cluster/node/bindTag")
-    Call<Response<Boolean>> bindNodeTag(@Body ClusterNodeBindTagRequest request);
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
similarity index 50%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
index a084f72fb..85da5a876 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
@@ -15,37 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
+package org.apache.inlong.manager.common.consts;
 
 /**
- * Inlong cluster node entity, including parent id, type, ip, etc.
+ * Constant class for agent ext params
  */
-@Data
-public class InlongClusterNodeEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private Integer id;
-    private Integer parentId;
-    private String type;
-    private String ip;
-    private Integer port;
-    private String protocolType;
-    private Integer nodeLoad;
-    private String nodeTags;
-    private String extParams;
-    private String description;
-
-    private Integer status;
-    private Integer isDeleted;
-    private String creator;
-    private String modifier;
-    private Date createTime;
-    private Date modifyTime;
-    private Integer version;
+public class AgentConstants {
 
+    public static final String AGENT_GROUP_KEY = "agentGroup";
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
index a084f72fb..b57aa6d26 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
@@ -36,7 +36,6 @@ public class InlongClusterNodeEntity implements Serializable {
     private Integer port;
     private String protocolType;
     private Integer nodeLoad;
-    private String nodeTags;
     private String extParams;
     private String description;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index c4de96c0f..561270f63 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -42,7 +42,7 @@ public class StreamSourceEntity implements Serializable {
 
     private String dataNodeName;
     private String inlongClusterName;
-    private String inlongClusterNodeTag;
+    private String inlongClusterNodeGroup;
     private String serializationType;
     private String snapshot;
     private Date reportTime;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 491dba3cd..6465d3c91 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -28,7 +28,6 @@
         <result column="port" jdbcType="INTEGER" property="port"/>
         <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
         <result column="node_load" jdbcType="INTEGER" property="nodeLoad"/>
-        <result column="node_tags" jdbcType="VARCHAR" property="nodeTags"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="description" jdbcType="VARCHAR" property="description"/>
         <result column="status" jdbcType="INTEGER" property="status"/>
@@ -40,7 +39,7 @@
         <result column="version" jdbcType="INTEGER" property="version"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, parent_id, type, ip, port, protocol_type, node_load, node_tags, ext_params, description,
+        id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description,
         status, is_deleted, creator, modifier, create_time, modify_time, version
     </sql>
 
@@ -48,12 +47,12 @@
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
                                          ip, port, protocol_type,
-                                         node_load, node_tags, ext_params,
+                                         node_load, ext_params,
                                          description, status,
                                          creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
                 #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
-                #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
                 #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
     </insert>
@@ -62,14 +61,13 @@
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
                                          ip, port, protocol_type,
-                                         node_load, node_tags, ext_params, status,
+                                         node_load, ext_params, status,
                                          creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
                 #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
-                #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
                 #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
         ON DUPLICATE KEY UPDATE node_load  = VALUES(node_load),
-                                node_tags  = VALUES(node_tags),
                                 ext_params = VALUES(ext_params),
                                 status     = VALUES(status),
                                 modifier   = VALUES(modifier)
@@ -143,7 +141,6 @@
             port          = #{port,jdbcType=INTEGER},
             protocol_type = #{protocolType,jdbcType=VARCHAR},
             node_load     = #{nodeLoad,jdbcType=INTEGER},
-            node_tags     = #{nodeTags,jdbcType=VARCHAR},
             ext_params    = #{extParams,jdbcType=LONGVARCHAR},
             description   = #{description,jdbcType=VARCHAR},
             status        = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7ba947b1f..2858e986a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -31,7 +31,7 @@
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
         <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
         <result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/>
-        <result column="inlong_cluster_node_tag" jdbcType="VARCHAR" property="inlongClusterNodeTag"/>
+        <result column="inlong_cluster_node_group" jdbcType="VARCHAR" property="inlongClusterNodeGroup"/>
         <result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
         <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
         <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
@@ -47,7 +47,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid,
-        data_node_name, inlong_cluster_name, inlong_cluster_node_tag, serialization_type, snapshot, report_time,
+        data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time,
         ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
@@ -55,14 +55,14 @@
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (inlong_group_id, inlong_stream_id,
                                    source_type, source_name, template_id, agent_ip,
-                                   uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_tag,
+                                   uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group,
                                    serialization_type, snapshot,
                                    report_time, ext_params, status,
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
                 #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR},
-                #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeTag,jdbcType=VARCHAR},
+                #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
                 #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
@@ -337,8 +337,8 @@
             <if test="inlongClusterName != null">
                 inlong_cluster_name = #{inlongClusterName,jdbcType=INTEGER},
             </if>
-            <if test="inlongClusterNodeTag != null">
-                inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=INTEGER},
+            <if test="inlongClusterNodeGroup != null">
+                inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=INTEGER},
             </if>
             <if test="serializationType != null">
                 serialization_type = #{serializationType,jdbcType=VARCHAR},
@@ -382,7 +382,7 @@
             uuid                = #{uuid,jdbcType=VARCHAR},
             data_node_name      = #{dataNodeName,jdbcType=VARCHAR},
             inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
-            inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=VARCHAR},
+            inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=VARCHAR},
             serialization_type  = #{serializationType,jdbcType=VARCHAR},
             snapshot            = #{snapshot,jdbcType=LONGVARCHAR},
             report_time         = #{reportTime,jdbcType=TIMESTAMP},
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
similarity index 72%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
index ad6fe0673..493c03ab4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster;
+package org.apache.inlong.manager.pojo.cluster.agent;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -25,24 +25,21 @@ import javax.validation.constraints.NotBlank;
 import java.util.List;
 
 /**
- * Inlong cluster node bind or unbind tag request
+ * Inlong cluster node bind or unbind group.Group is used to distinguish which stream source tasks are collected
  */
 @Data
-@ApiModel("Cluster node bind and unbind tag request")
-public class ClusterNodeBindTagRequest {
+@ApiModel("Cluster node bind and unbind stream source label request, stream source label is a filter to judge "
+        + "whether to accept the stream source task")
+public class AgentClusterNodeBindGroupRequest {
 
-    @NotBlank(message = "Cluster nodeTag cannot be blank")
-    @ApiModelProperty(value = "Cluster node tag")
-    private String clusterNodeTag;
+    @NotBlank(message = "Cluster agent group cannot be blank")
+    @ApiModelProperty(value = "Cluster agent group")
+    private String agentGroup;
 
     @NotBlank(message = "clusterName cannot be blank")
     @ApiModelProperty(value = "Cluster name")
     private String clusterName;
 
-    @NotBlank(message = "type cannot be blank")
-    @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
-    private String type;
-
     @ApiModelProperty(value = "Cluster node ip list which needs to bind tag")
     private List<String> bindClusterNodes;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 099ab45e7..f0adcdd5e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -81,10 +81,10 @@ public class SourceRequest {
     @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongClusterName;
 
-    @ApiModelProperty("Inlong cluster node tag")
+    @ApiModelProperty("Inlong cluster node label for filtering stream source collect task")
     @Length(min = 1, max = 128, message = "length must be between 1 and 128")
     @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
-    private String inlongClusterNodeTag;
+    private String inlongClusterNodeGroup;
 
     @ApiModelProperty("Data node name")
     @Length(max = 128, message = "length must be less than or equal to 128")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 8ecccbc8b..fac486088 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -386,15 +385,6 @@ public interface InlongClusterService {
      */
     Boolean deleteNode(Integer id, String operator);
 
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @param operator current operator
-     * @return whether succeed
-     */
-    Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator);
-
     /**
      * Delete cluster node.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a76ec6dcd..598dd10a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -54,7 +54,6 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -1390,60 +1389,6 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return true;
     }
 
-    @Override
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator) {
-        HashSet<String> bindSet = Sets.newHashSet();
-        HashSet<String> unbindSet = Sets.newHashSet();
-        if (request.getBindClusterNodes() != null) {
-            bindSet.addAll(request.getBindClusterNodes());
-        }
-        if (request.getUnbindClusterNodes() != null) {
-            unbindSet.addAll(request.getUnbindClusterNodes());
-        }
-        Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
-                "can not add and del node tag in the sameTime");
-        InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), request.getType());
-        String message = "Current user does not have permission to bind cluster node tag";
-        checkUser(cluster, operator, message);
-
-        if (CollectionUtils.isNotEmpty(bindSet)) {
-            bindSet.stream().flatMap(clusterNode -> {
-                ClusterPageRequest pageRequest = new ClusterPageRequest();
-                pageRequest.setParentId(cluster.getId());
-                pageRequest.setType(request.getType());
-                pageRequest.setKeyword(clusterNode);
-                return clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
-                    .forEach(entity -> {
-                        String nodeTags = entity.getNodeTags();
-                        Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
-                                : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
-                        tagSet.add(request.getClusterNodeTag());
-                        entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
-                        clusterNodeMapper.updateById(entity);
-                    });
-        }
-
-        if (CollectionUtils.isNotEmpty(unbindSet)) {
-            unbindSet.stream().flatMap(clusterNode -> {
-                ClusterPageRequest pageRequest = new ClusterPageRequest();
-                pageRequest.setParentId(cluster.getId());
-                pageRequest.setType(request.getType());
-                pageRequest.setKeyword(clusterNode);
-                return clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
-                    .forEach(entity -> {
-                        String nodeTags = entity.getNodeTags();
-                        Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
-                                : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
-                        tagSet.remove(request.getClusterNodeTag());
-                        entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
-                        clusterNodeMapper.updateById(entity);
-                    });
-        }
-        return true;
-    }
-
     @Override
     public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolType) {
         LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index e7db76937..099ebca72 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 
 /**
  * The service interface for agent
@@ -49,4 +50,11 @@ public interface AgentService {
      */
     TaskResult getTaskResult(TaskRequest request);
 
+    /**
+     * Divide the agent into different groups, which collect different stream source tasks.
+     *
+     * @param request Request of the bind group.
+     * @return Whether succeed.
+     */
+    Boolean bindGroup(AgentClusterNodeBindGroupRequest request);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f6e1776ce..0efd42c32 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import com.google.common.collect.Lists;
+import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -33,6 +34,7 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.consts.SourceType;
@@ -54,6 +56,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
@@ -75,6 +78,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -92,6 +96,7 @@ public class AgentServiceImpl implements AgentService {
     private static final int ISSUED_STATUS = 3;
     private static final int MODULUS_100 = 100;
     private static final int TASK_FETCH_SIZE = 2;
+    private static final Gson GSON = new Gson();
 
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
@@ -192,6 +197,66 @@ public class AgentServiceImpl implements AgentService {
         return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
+    public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
+        HashSet<String> bindSet = Sets.newHashSet();
+        HashSet<String> unbindSet = Sets.newHashSet();
+        if (request.getBindClusterNodes() != null) {
+            bindSet.addAll(request.getBindClusterNodes());
+        }
+        if (request.getUnbindClusterNodes() != null) {
+            unbindSet.addAll(request.getUnbindClusterNodes());
+        }
+        Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
+                "can not add and del node tag in the sameTime");
+        InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), ClusterType.AGENT);
+        String message = "Current user does not have permission to bind cluster node tag";
+
+        if (CollectionUtils.isNotEmpty(bindSet)) {
+            bindSet.stream().flatMap(clusterNode -> {
+                ClusterPageRequest pageRequest = new ClusterPageRequest();
+                pageRequest.setParentId(cluster.getId());
+                pageRequest.setType(ClusterType.AGENT);
+                pageRequest.setKeyword(clusterNode);
+                return clusterNodeMapper.selectByCondition(pageRequest).stream();
+            }).filter(entity -> entity != null)
+                    .forEach(entity -> {
+                        Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+                                : GSON.fromJson(entity.getExtParams(), Map.class);
+                        Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                                : Sets.newHashSet(
+                                        extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        groupSet.add(request.getAgentGroup());
+                        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+                        entity.setExtParams(GSON.toJson(extParams));
+                        clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+                    });
+        }
+
+        if (CollectionUtils.isNotEmpty(unbindSet)) {
+            unbindSet.stream().flatMap(clusterNode -> {
+                ClusterPageRequest pageRequest = new ClusterPageRequest();
+                pageRequest.setParentId(cluster.getId());
+                pageRequest.setType(ClusterType.AGENT);
+                pageRequest.setKeyword(clusterNode);
+                return clusterNodeMapper.selectByCondition(pageRequest).stream();
+            }).filter(entity -> entity != null)
+                    .forEach(entity -> {
+                        Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+                                : GSON.fromJson(entity.getExtParams(), Map.class);
+                        Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                                : Sets.newHashSet(
+                                        extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        groupSet.remove(request.getAgentGroup());
+                        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+                        entity.setExtParams(GSON.toJson(extParams));
+                        clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+                    });
+        }
+        return true;
+    }
+
     /**
      * Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
      *
@@ -251,7 +316,7 @@ public class AgentServiceImpl implements AgentService {
 
     private void preProcessFileTask(TaskRequest taskRequest) {
         preProcessTemplateFileTask(taskRequest);
-        preProcessTagFileTasks(taskRequest);
+        preProcessLabelFileTasks(taskRequest);
     }
 
     /**
@@ -306,7 +371,7 @@ public class AgentServiceImpl implements AgentService {
      *
      * @param taskRequest
      */
-    private void preProcessTagFileTasks(TaskRequest taskRequest) {
+    private void preProcessLabelFileTasks(TaskRequest taskRequest) {
         List<Integer> needProcessedStatusList = Arrays.asList(
                 SourceStatus.SOURCE_NORMAL.getCode(),
                 SourceStatus.SOURCE_FAILED.getCode(),
@@ -329,7 +394,7 @@ public class AgentServiceImpl implements AgentService {
                     Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
                             SourceStatus.SOURCE_FROZEN,
                             SourceStatus.TO_BE_ISSUED_FROZEN);
-                    if (!matchTag(sourceEntity, clusterNodeEntity)
+                    if (!matchLabel(sourceEntity, clusterNodeEntity)
                             && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
                         LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
                                 + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
@@ -348,7 +413,7 @@ public class AgentServiceImpl implements AgentService {
                             SourceStatus.TO_BE_ISSUED_ACTIVE);
                     Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
                             StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
-                    if (matchTag(sourceEntity, clusterNodeEntity)
+                    if (matchLabel(sourceEntity, clusterNodeEntity)
                             && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
                             && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
                         LOGGER.info("Transform task({}) from {} to {} because tag rematch "
@@ -514,20 +579,21 @@ public class AgentServiceImpl implements AgentService {
         }).collect(Collectors.toList());
     }
 
-    private boolean matchTag(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
+    private boolean matchLabel(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
         Preconditions.checkNotNull(sourceEntity, "cluster must be valid");
-        if (sourceEntity.getInlongClusterNodeTag() == null) {
+        if (sourceEntity.getInlongClusterNodeGroup() == null) {
             return true;
         }
-        if (clusterNodeEntity == null || clusterNodeEntity.getNodeTags() == null) {
+
+        if (clusterNodeEntity == null || clusterNodeEntity.getExtParams() == null) {
             return false;
         }
 
-        Set<String> nodeTags = Stream.of(
-                clusterNodeEntity.getNodeTags().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        Set<String> sourceTags = Stream.of(
-                sourceEntity.getInlongClusterNodeTag().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        return sourceTags.stream().anyMatch(sourceTag -> nodeTags.contains(sourceTag));
+        Map<String, String> extParams = GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class);
+        Set<String> clusterNodeLabels = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+        Set<String> sourceLabels = Stream.of(
+                sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+        return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel));
     }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 7ccb7db5d..2fb8d5608 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterStatus;
 import org.apache.inlong.manager.common.enums.NodeStatus;
@@ -47,14 +49,21 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Component
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
     private static final String AUTO_REGISTERED = "auto registered";
+    private static final Gson GSON = new Gson();
 
     @Getter
     private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -211,21 +220,30 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
         clusterNode.setProtocolType(heartbeat.getProtocolType());
         clusterNode.setNodeLoad(heartbeat.getLoad());
-        clusterNode.setNodeTags(heartbeat.getNodeTag());
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
         clusterNode.setDescription(AUTO_REGISTERED);
+        insertOrUpdateLabel(clusterNode, heartbeat);
         return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
     }
 
     private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setNodeLoad(heartbeat.getLoad());
-        clusterNode.setNodeTags(heartbeat.getNodeTag());
+        insertOrUpdateLabel(clusterNode, heartbeat);
         return clusterNodeMapper.updateById(clusterNode);
     }
 
+    private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
+        Set<String> groupSet = heartbeat.getNodeGroup() == null ? new HashSet<>()
+                : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+        Map<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<>()
+                : GSON.fromJson(clusterNode.getExtParams(), Map.class);
+        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+        clusterNode.setExtParams(GSON.toJson(extParams));
+    }
+
     private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {
         return clusterNodeMapper.deleteById(clusterNode.getId());
     }
@@ -281,6 +299,6 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         if (oldHB == null) {
             return true;
         }
-        return oldHB.getNodeTag() != newHB.getNodeTag() || oldHB.getLoad() != newHB.getLoad();
+        return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad();
     }
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index f509f934b..2f85d1c9a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -78,7 +78,7 @@ class AgentServiceTest extends ServiceBaseTest {
     private InlongStreamEntityMapper streamMapper;
 
     private List<Pair<String, String>> groupStreamCache;
-    private List<String> tagCache;
+    private List<String> groupCache;
 
     /**
      * Save template source
@@ -110,7 +110,7 @@ class AgentServiceTest extends ServiceBaseTest {
     /**
      * mock {@link StreamSourceService#save}
      */
-    public Pair<String, String> saveSource(String tag) {
+    public Pair<String, String> saveSource(String group) {
         String groupId = UUID.randomUUID().toString();
         String streamId = UUID.randomUUID().toString();
         groupStreamCache.add(new ImmutablePair<>(groupId, streamId));
@@ -121,9 +121,9 @@ class AgentServiceTest extends ServiceBaseTest {
         sourceInfo.setInlongStreamId(streamId);
         sourceInfo.setSourceType(SourceType.FILE);
         sourceInfo.setInlongClusterName(MockAgent.CLUSTER_NAME);
-        sourceInfo.setInlongClusterNodeTag(tag);
+        sourceInfo.setInlongClusterNodeGroup(group);
         sourceInfo.setSourceName(
-                String.format("Source task for cluster(%s) and tag(%s)", MockAgent.CLUSTER_NAME, tag));
+                String.format("Source task for cluster(%s) and group(%s)", MockAgent.CLUSTER_NAME, group));
         sourceService.save(sourceInfo, GLOBAL_OPERATOR);
         sourceService.updateStatus(
                 groupId,
@@ -170,7 +170,7 @@ class AgentServiceTest extends ServiceBaseTest {
     @BeforeEach
     public void setupEach() {
         groupStreamCache = new ArrayList<>();
-        tagCache = new ArrayList<>();
+        groupCache = new ArrayList<>();
     }
 
     @AfterEach
@@ -184,27 +184,27 @@ class AgentServiceTest extends ServiceBaseTest {
                     groupStreamCache.stream().map(Pair::getValue).collect(Collectors.toList()));
         }
         groupStreamCache.clear();
-        tagCache.stream().forEach(tag -> bindTag(false, tag));;
+        groupCache.stream().forEach(group -> bindGroup(false, group));;
     }
 
-    private void bindTag(boolean bind, String tag) {
+    private void bindGroup(boolean bind, String group) {
         if (bind) {
-            tagCache.add(tag);
+            groupCache.add(group);
         }
-        agent.bindTag(bind, tag);
+        agent.bindGroup(bind, group);
     }
 
     /**
-     * Test bind tag for node.
+     * Test bind group for node.
      */
     @Test
-    public void testTagMatch() {
-        saveSource("tag1,tag3");
-        saveSource("tag2,tag3");
-        saveSource("tag2,tag3");
-        saveSource("tag4");
-        bindTag(true, "tag1");
-        bindTag(true, "tag2");
+    public void testGroupMatch() {
+        saveSource("group1,group3");
+        saveSource("group2,group3");
+        saveSource("group2,group3");
+        saveSource("group4");
+        bindGroup(true, "group1");
+        bindGroup(true, "group2");
 
         TaskResult taskResult = agent.pullTask();
         Assertions.assertTrue(taskResult.getCmdConfigs().isEmpty());
@@ -220,12 +220,12 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag mismatch source task and next time rematch source task.
+     * Test node group mismatch source task and next time rematch source task.
      */
     @Test
-    public void testTagMismatchAndRematch() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+    public void testGroupMismatchAndRematch() {
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
@@ -235,8 +235,8 @@ class AgentServiceTest extends ServiceBaseTest {
                 .findAny()
                 .get()
                 .getId();
-        // unbind tag and mismatch
-        bindTag(false, "tag1");
+        // unbind group and mismatch
+        bindGroup(false, "group1");
         TaskResult t1 = agent.pullTask();
         Assertions.assertEquals(1, t1.getDataConfigs().size());
         Assertions.assertEquals(1, t1.getDataConfigs().stream()
@@ -246,8 +246,8 @@ class AgentServiceTest extends ServiceBaseTest {
         DataConfig d1 = t1.getDataConfigs().get(0);
         Assertions.assertEquals(sourceId, d1.getTaskId());
 
-        // bind tag and rematch
-        bindTag(true, "tag1");
+        // bind group and rematch
+        bindGroup(true, "group1");
         TaskResult t2 = agent.pullTask();
         Assertions.assertEquals(1, t2.getDataConfigs().size());
         Assertions.assertEquals(1, t2.getDataConfigs().stream()
@@ -263,14 +263,14 @@ class AgentServiceTest extends ServiceBaseTest {
      */
     @Test
     public void testSuspendFailWhenNotAck() {
-        Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
 
         // mismatch
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         agent.pullTask();
 
         // suspend
@@ -282,21 +282,21 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag rematch source task but group suspend
+     * Test node group rematch source task but group suspend
      */
     @Test
     public void testRematchedWhenSuspend() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
 
         // mismatch and rematch
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         agent.pullTask();
         agent.pullTask(); // report last to make it from 304 -> 104
-        bindTag(true, "tag1");
+        bindGroup(true, "group1");
 
         // suspend
         suspendSource(groupStream.getLeft(), groupStream.getRight());
@@ -305,12 +305,12 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag mismatch source task but group restart
+     * Test node group mismatch source task but group restart
      */
     @Test
     public void testMismatchedWhenRestart() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
@@ -318,7 +318,7 @@ class AgentServiceTest extends ServiceBaseTest {
         // suspend and restart
         suspendSource(groupStream.getLeft(), groupStream.getRight());
         restartSource(groupStream.getLeft(), groupStream.getRight());
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         TaskResult taskResult = agent.pullTask();
         Assertions.assertEquals(1, taskResult.getDataConfigs().size());
         Assertions.assertEquals(1, taskResult.getDataConfigs().stream()
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
index 2acb651f5..9e0061d18 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
@@ -44,7 +44,7 @@ public class MockAgent {
     // 2. Regularly report the previously executed tasks to the manager (may be successful or fail)
     public static final String LOCAL_IP = "127.0.0.1";
     public static final String LOCAL_PORT = "8008";
-    public static final String LOCAL_TAG = "default_tag";
+    public static final String LOCAL_GROUP = "default_group";
     public static final String CLUSTER_TAG = "default_cluster_tag";
     public static final String CLUSTER_NAME = "1c59ef9e8e1e43cfb25ee8b744c9c81b_2790";
 
@@ -52,7 +52,7 @@ public class MockAgent {
     private HeartbeatService heartbeatService;
 
     private Queue<CommandEntity> commands = new LinkedList<>();
-    private Set<String> tags = Sets.newHashSet(LOCAL_TAG);
+    private Set<String> groups = Sets.newHashSet(LOCAL_GROUP);
     private int jobLimit;
 
     public MockAgent(AgentService agentService, HeartbeatService heartbeatService, int jobLimit) {
@@ -83,17 +83,17 @@ public class MockAgent {
         heartbeat.setComponentType(ComponentTypeEnum.Agent.getType());
         heartbeat.setClusterName(CLUSTER_NAME);
         heartbeat.setClusterTag(CLUSTER_TAG);
-        heartbeat.setNodeTag(tags.stream().collect(Collectors.joining(InlongConstants.COMMA)));
+        heartbeat.setNodeGroup(groups.stream().collect(Collectors.joining(InlongConstants.COMMA)));
         heartbeat.setInCharges(GLOBAL_OPERATOR);
         heartbeat.setReportTime(System.currentTimeMillis());
         heartbeatService.reportHeartbeat(heartbeat);
     }
 
-    public void bindTag(boolean bind, String tag) {
+    public void bindGroup(boolean bind, String group) {
         if (bind) {
-            tags.add(tag);
+            groups.add(group);
         } else {
-            tags.remove(tag);
+            groups.remove(group);
         }
         sendHeartbeat();
     }
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 6af822e01..fc4938469 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -133,7 +133,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `port`          int(6)       NULL COMMENT 'Cluster port',
     `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `node_load`     int(11)               DEFAULT '-1' COMMENT 'Current load value of the node',
-    `node_tags`     varchar(512)          DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -330,7 +329,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
-    `inlong_cluster_node_tag` varchar(512)      DEFAULT NULL COMMENT 'Cluster node tag',
+    `inlong_cluster_node_group` varchar(512)      DEFAULT NULL COMMENT 'Cluster node group',
     `serialization_type`  varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`            mediumtext            DEFAULT NULL COMMENT 'Snapshot of this source task',
     `report_time`         timestamp    NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f8bcb47fb..3036a7525 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -143,7 +143,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `port`          int(6)       NULL COMMENT 'Cluster port',
     `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `node_load`     int(11)               DEFAULT '-1' COMMENT 'Current load value of the node',
-    `node_tags`     varchar(512)          DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -347,7 +346,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
-    `inlong_cluster_node_tag` varchar(512)      DEFAULT NULL COMMENT 'Cluster node tag',
+    `inlong_cluster_node_group` varchar(512)      DEFAULT NULL COMMENT 'Cluster node group',
     `serialization_type`  varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`            mediumtext            DEFAULT NULL COMMENT 'Snapshot of this source task',
     `report_time`         timestamp    NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/changes-1.5.0.sql b/inlong-manager/manager-web/sql/changes-1.5.0.sql
index 4b655bc2c..88ed142bd 100644
--- a/inlong-manager/manager-web/sql/changes-1.5.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.5.0.sql
@@ -32,9 +32,5 @@ ALTER TABLE `inlong_cluster_node`
     ADD COLUMN `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node';
 
 
-ALTER TABLE `inlong_cluster_node`
-    ADD COLUMN `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip';
-
-
 ALTER TABLE `stream_source`
-    ADD COLUMN `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag';
\ No newline at end of file
+    ADD COLUMN `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 3ea4634d3..df49e13ea 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -228,14 +227,6 @@ public class InlongClusterController {
         return Response.success(clusterService.deleteNode(id, LoginUserUtils.getLoginUser().getName()));
     }
 
-    @RequestMapping(value = "/cluster/node/bindTag")
-    @OperationLog(operation = OperationType.UPDATE)
-    @ApiOperation(value = "Bind or unbind cluster node tag")
-    public Response<Boolean> bindNodeTag(@Validated @RequestBody ClusterNodeBindTagRequest request) {
-        String username = LoginUserUtils.getLoginUser().getName();
-        return Response.success(clusterService.bindNodeTag(request, username));
-    }
-
     @PostMapping("/cluster/testConnection")
     @ApiOperation(value = "Test connection for inlong cluster")
     public Response<Boolean> testConnection(@RequestBody ClusterRequest request) {
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 88343f940..5e2270637 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.AgentService;
@@ -68,4 +69,9 @@ public class AgentController {
         return Response.success(agentService.getTaskResult(request));
     }
 
+    @PostMapping("/agent/bindGroup")
+    @ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.")
+    public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {
+        return Response.success(agentService.bindGroup(request));
+    }
 }