You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 11:43:52 UTC

[inlong] branch branch-1.5 updated (3db90d470 -> b96d4ed4a)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
     new e26610ff1 [INLONG-7156][Agent] Support directly sending raw file data (#7157)
     new fddfceacb [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)
     new db296b39d [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
     new 34f7f04cb [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
     new b96d4ed4a [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/agent/constant/AgentConstants.java      |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 .../sources/reader/file/FileReaderOperator.java    |  8 +-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +++++++-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +-
 .../agent-plugins/src/test/resources/test/3.txt    |  5 ++
 inlong-agent/conf/agent.properties                 |  2 +-
 .../inlong/audit/send/ClientPipelineFactory.java   |  9 +--
 .../apache/inlong/audit/send/SenderChannel.java    |  8 +-
 .../java/org/apache/inlong/audit/util/Config.java  | 13 ++-
 .../inlong/common/heartbeat/HeartbeatMsg.java      |  4 +-
 .../src/components/NodeSelect/index.tsx            |  6 +-
 .../Agent.ts => consumes/defaults/Kafka.ts}        |  6 +-
 .../src/metas/consumes/defaults/index.ts           |  5 ++
 .../src/metas/groups/defaults/Kafka.ts             |  1 +
 .../src/pages/GroupDetail/Audit/config.tsx         |  2 +-
 .../inlong/manager/client/api/InlongClient.java    |  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java      | 13 ---
 .../client/api/service/InlongClusterApi.java       |  4 -
 .../manager/common/consts/AgentConstants.java      |  8 +-
 .../dao/entity/InlongClusterNodeEntity.java        |  1 -
 .../manager/dao/entity/StreamSourceEntity.java     |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml      | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java}         | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java      | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -------------
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java        | 92 +++++++++++++++++++---
 .../service/heartbeat/HeartbeatManager.java        | 24 +++++-
 .../service/core/impl/AgentServiceTest.java        | 74 ++++++++---------
 .../inlong/manager/service/mocks/MockAgent.java    | 12 +--
 .../main/resources/h2/apache_inlong_manager.sql    |  3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java    |  9 ---
 .../web/controller/openapi/AgentController.java    |  6 ++
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 ++++++--
 41 files changed, 299 insertions(+), 248 deletions(-)
 create mode 100644 inlong-agent/agent-plugins/src/test/resources/test/3.txt
 copy inlong-dashboard/src/metas/{clusters/defaults/Agent.ts => consumes/defaults/Kafka.ts} (90%)
 copy inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/state/StateCallback.java => inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java (81%)
 mode change 100755 => 100644
 rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{ClusterNodeBindTagRequest.java => agent/AgentClusterNodeBindGroupRequest.java} (72%)


[inlong] 02/05: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit fddfceacb5119c1709aa3eb9757298fc07d312a5
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Thu Jan 5 17:19:38 2023 +0800

    [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)
    
    Co-authored-by: doleyzi <do...@tencent.com>
---
 .../org/apache/inlong/audit/send/ClientPipelineFactory.java |  9 ++++-----
 .../java/org/apache/inlong/audit/send/SenderChannel.java    |  8 +++++---
 .../src/main/java/org/apache/inlong/audit/util/Config.java  | 13 ++++++-------
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
index d6694b7a1..1800714cf 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
@@ -18,21 +18,20 @@
 package org.apache.inlong.audit.send;
 
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.SocketChannel;
 import org.apache.inlong.audit.util.Decoder;
 
 public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
 
-    private final SimpleChannelInboundHandler sendHandler;
+    private SenderManager senderManager;
 
-    public ClientPipelineFactory(SimpleChannelInboundHandler sendHandler) {
-        this.sendHandler = sendHandler;
+    public ClientPipelineFactory(SenderManager senderManager) {
+        this.senderManager = senderManager;
     }
 
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast("contentDecoder", new Decoder());
-        ch.pipeline().addLast("handler", sendHandler);
+        ch.pipeline().addLast("handler", new SenderHandler(senderManager));
     }
 }
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 85f42b4e3..28d0cc35b 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -130,8 +130,7 @@ public class SenderChannel {
         client.option(ChannelOption.SO_REUSEADDR, true);
         client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
         client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
-        SenderHandler senderHandler = new SenderHandler(senderManager);
-        client.handler(new ClientPipelineFactory(senderHandler));
+        client.handler(new ClientPipelineFactory(senderManager));
     }
 
     /**
@@ -144,7 +143,10 @@ public class SenderChannel {
             return true;
         }
         try {
-            init();
+            if (client == null) {
+                init();
+            }
+
             synchronized (client) {
                 ChannelFuture future = client.connect(this.ipPort.addr).sync();
                 this.channel = future.channel();
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
index e4acd2878..a18ba1a86 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -33,6 +33,8 @@ public class Config {
     private static final Logger logger = LoggerFactory.getLogger(Config.class);
     private String localIP = "";
     private String dockerId = "";
+    private static final int CGROUP_FILE_LENGTH = 50;
+    private static final int DOCKERID_LENGTH = 10;
 
     public void init() {
         initIP();
@@ -78,15 +80,12 @@ public class Config {
         }
         try (BufferedReader in = new BufferedReader(new FileReader("/proc/self/cgroup"))) {
             String dockerID = in.readLine();
-            if (dockerID != null) {
-                int n = dockerID.indexOf("/");
-                String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1));
-                n = dockerID2.indexOf("/");
-                if (dockerID2.length() > 12) {
-                    dockerId = dockerID2.substring(n + 1, 12);
-                }
+            if (dockerID == null || dockerID.length() < CGROUP_FILE_LENGTH) {
                 in.close();
+                return;
             }
+            dockerId = dockerID.substring(dockerID.length() - DOCKERID_LENGTH);
+            in.close();
         } catch (Exception ex) {
             logger.error(ex.toString());
         }


[inlong] 04/05: [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 34f7f04cb1e64aebf85a8d90574e741dab0aa629
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:43:23 2023 +0800

    [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
---
 .../src/components/NodeSelect/index.tsx            |  6 ++++-
 .../metas/consumes/defaults/{index.ts => Kafka.ts} | 26 ++++++----------------
 .../src/metas/consumes/defaults/index.ts           |  5 +++++
 .../src/metas/groups/defaults/Kafka.ts             |  1 +
 .../src/pages/GroupDetail/Audit/config.tsx         |  2 +-
 5 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx b/inlong-dashboard/src/components/NodeSelect/index.tsx
index 4e83225e7..5c89e7638 100644
--- a/inlong-dashboard/src/components/NodeSelect/index.tsx
+++ b/inlong-dashboard/src/components/NodeSelect/index.tsx
@@ -51,7 +51,11 @@ const NodeSelect: React.FC<NodeSelectProps> = _props => {
           })),
       },
     },
-    addonAfter: <Link to="/node">{i18n.t('components.NodeSelect.Create')}</Link>,
+    addonAfter: (
+      <Link to="/node" target="_blank">
+        {i18n.t('components.NodeSelect.Create')}
+      </Link>
+    ),
   };
   return <HighSelect {...props} />;
 };
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
similarity index 60%
copy from inlong-dashboard/src/metas/consumes/defaults/index.ts
copy to inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
index 214cf092f..27392fcc2 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
@@ -17,23 +17,11 @@
  * under the License.
  */
 
-import type { MetaExportWithBackendList } from '@/metas/types';
-import type { ConsumeMetaType } from '../types';
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { ConsumeInfo } from '../common/ConsumeInfo';
 
-export const allDefaultConsumes: MetaExportWithBackendList<ConsumeMetaType> = [
-  {
-    label: 'ALL',
-    value: '',
-    LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })),
-  },
-  {
-    label: 'Pulsar',
-    value: 'PULSAR',
-    LoadEntity: () => import('./Pulsar'),
-  },
-  {
-    label: 'TubeMq',
-    value: 'TUBEMQ',
-    LoadEntity: () => import('./TubeMq'),
-  },
-];
+export default class KafkaConsume
+  extends ConsumeInfo
+  implements DataWithBackend, RenderRow, RenderList {}
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/index.ts
index 214cf092f..0f3195d68 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/index.ts
@@ -26,6 +26,11 @@ export const allDefaultConsumes: MetaExportWithBackendList<ConsumeMetaType> = [
     value: '',
     LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })),
   },
+  {
+    label: 'Kafka',
+    value: 'KAFKA',
+    LoadEntity: () => import('./Kafka'),
+  },
   {
     label: 'Pulsar',
     value: 'PULSAR',
diff --git a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
index 71fdf8bfa..2106269e3 100644
--- a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
+++ b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
@@ -33,6 +33,7 @@ export default class KafkaGroup
   @FieldDecorator({
     type: 'inputnumber',
     rules: [{ required: true }],
+    initialValue: 1,
     extra: i18n.t('meta.Group.Kafka.PartitionExtra'),
     props: {
       min: 1,
diff --git a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
index 10fa221d6..8d6a8b41c 100644
--- a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
@@ -152,7 +152,7 @@ export const getFormContent = (inlongGroupId, initialValues, onSearch, onDataStr
 
 export const getTableColumns = source => {
   const data = source.map(item => ({
-    title: auditMap[item.auditId]?.label + (item.nodeType || '') || item.auditId,
+    title: auditMap[item.auditId]?.label || item.auditId,
     dataIndex: item.auditId,
     render: text => text || 0,
   }));


[inlong] 05/05: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Jan 5 19:19:02 2023 +0800

    [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 ++++++++++++++++------
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
             List<SourceRecord> sourceRecords,
             SchemaNameAdjuster nameAdjuster) {
         List<SourceRecord> normalizedRecords = new ArrayList<>();
-        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
+        Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>();
+        List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>();
         List<SourceRecord> binlogRecords = new ArrayList<>();
         if (!sourceRecords.isEmpty()) {
 
@@ -103,7 +104,11 @@ public class RecordUtils {
             for (; i < sourceRecords.size(); i++) {
                 SourceRecord sourceRecord = sourceRecords.get(i);
                 if (!isHighWatermarkEvent(sourceRecord)) {
-                    snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
+                    if (sourceRecord.key() == null) {
+                        snapshotRecordsWithoutKey.add(sourceRecord);
+                    } else {
+                        snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord);
+                    }
                 } else {
                     highWatermark = sourceRecord;
                     i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
                     String.format(
                             "The last record should be high watermark signal event, but is %s",
                             highWatermark));
+
             normalizedRecords =
-                    upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
+                    upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey,
+                            binlogRecords, snapshotRecordsWithoutKey);
+
         }
         return normalizedRecords;
     }
@@ -139,8 +147,9 @@ public class RecordUtils {
     private static List<SourceRecord> upsertBinlog(
             SourceRecord lowWatermarkEvent,
             SourceRecord highWatermarkEvent,
-            Map<Struct, SourceRecord> snapshotRecords,
-            List<SourceRecord> binlogRecords) {
+            Map<Struct, SourceRecord> snapshotRecordsWithKey,
+            List<SourceRecord> binlogRecords,
+            List<SourceRecord> snapshotRecordsWithoutKey) {
         // upsert binlog events to snapshot events of split
         if (!binlogRecords.isEmpty()) {
             for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
                                             binlog.key(),
                                             binlog.valueSchema(),
                                             envelope.read(after, source, fetchTs));
-                            snapshotRecords.put(key, record);
+                            snapshotRecordsWithKey.put(key, record);
                             break;
                         case DELETE:
-                            snapshotRecords.remove(key);
+                            snapshotRecordsWithKey.remove(key);
                             break;
                         case READ:
                             throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
 
         final List<SourceRecord> normalizedRecords = new ArrayList<>();
         normalizedRecords.add(lowWatermarkEvent);
-        normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+        if (!snapshotRecordsWithoutKey.isEmpty()) {
+            // for table without key, there is no need for binlog upsert
+            // because highWatermark equals to lowWatermark
+            normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+        } else {
+            normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+        }
         normalizedRecords.add(highWatermarkEvent);
 
         return normalizedRecords;


[inlong] 03/05: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit db296b39df069afca8ac6d8755ffc67395e2262a
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:35:19 2023 +0800

    [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
---
 .../inlong/agent/constant/AgentConstants.java      |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 inlong-agent/conf/agent.properties                 |  2 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java      |  4 +-
 .../inlong/manager/client/api/InlongClient.java    |  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java      | 13 ---
 .../client/api/service/InlongClusterApi.java       |  4 -
 .../manager/common/consts/AgentConstants.java}     | 33 +-------
 .../dao/entity/InlongClusterNodeEntity.java        |  1 -
 .../manager/dao/entity/StreamSourceEntity.java     |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml      | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java}         | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java      | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -------------
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java        | 92 +++++++++++++++++++---
 .../service/heartbeat/HeartbeatManager.java        | 24 +++++-
 .../service/core/impl/AgentServiceTest.java        | 74 ++++++++---------
 .../inlong/manager/service/mocks/MockAgent.java    | 12 +--
 .../main/resources/h2/apache_inlong_manager.sql    |  3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java    |  9 ---
 .../web/controller/openapi/AgentController.java    |  6 ++
 27 files changed, 195 insertions(+), 241 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 2933b907b..eceb0dbf3 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -110,7 +110,7 @@ public class AgentConstants {
     public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
     public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
     public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
-    public static final String AGENT_NODE_TAG = "agent.node.tag";
+    public static final String AGENT_NODE_GROUP = "agent.node.group";
 
     public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port";
     public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 067c92361..9451f8b8d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
 import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
         final String clusterName = conf.get(AGENT_CLUSTER_NAME);
         final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
         final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
-        final String nodeTag = conf.get(AGENT_NODE_TAG);
+        final String nodeGroup = conf.get(AGENT_NODE_GROUP);
 
         HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
         heartbeatMsg.setIp(agentIp);
@@ -227,8 +227,8 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
         if (StringUtils.isNotBlank(inCharges)) {
             heartbeatMsg.setInCharges(inCharges);
         }
-        if (StringUtils.isNotBlank(nodeTag)) {
-            heartbeatMsg.setNodeTag(nodeTag);
+        if (StringUtils.isNotBlank(nodeGroup)) {
+            heartbeatMsg.setNodeGroup(nodeGroup);
         }
 
         Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties
index ee0942a22..e8dfd5911 100755
--- a/inlong-agent/conf/agent.properties
+++ b/inlong-agent/conf/agent.properties
@@ -42,7 +42,7 @@ thread.pool.await.time=30
 agent.local.ip=127.0.0.1
 agent.local.uuid=
 agent.local.uuid.open=false
-agent.node.tag=default_tag
+agent.node.group=default_group
 agent.enable.oom.exit=false
 agent.custom.fixed.ip=
 # max capacity of memory channel
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 31d1eee65..3a54b714f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -76,9 +76,9 @@ public class HeartbeatMsg {
     private String clusterTag;
 
     /**
-      * Tag of node, separated by commas(,)
+      * Group of node for filtering stream source collect task, separated by commas(,)
       */
-    private String nodeTag;
+    private String nodeGroup;
 
     /**
      * Ext tag of cluster, key=value pairs seperated by &
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index a1be17a92..d65660f3e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -261,12 +260,4 @@ public interface InlongClient {
      * @return whether succeed
      */
     Boolean deleteNode(Integer id);
-
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @return whether succeed
-     */
-    Boolean bindNodeTag(ClusterNodeBindTagRequest request);
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 69a538eea..c8f988e44 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -39,7 +39,6 @@ import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -288,11 +287,6 @@ public class InlongClientImpl implements InlongClient {
         return clusterClient.deleteNode(id);
     }
 
-    @Override
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
-        return clusterClient.bindNodeTag(request);
-    }
-
     private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) {
         Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap();
         sources.forEach(source -> {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
index ecf271102..7140bf595 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -23,7 +23,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -297,16 +296,4 @@ public class InlongClusterClient {
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
-
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @return whether succeed
-     */
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
-        Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.bindNodeTag(request));
-        ClientUtils.assertRespSuccess(response);
-        return response.getData();
-    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
index 79b98bf9b..776aacec0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service;
 
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -99,7 +98,4 @@ public interface InlongClusterApi {
 
     @DELETE("cluster/node/delete/{id}")
     Call<Response<Boolean>> deleteNode(@Path("id") Integer id);
-
-    @POST("cluster/node/bindTag")
-    Call<Response<Boolean>> bindNodeTag(@Body ClusterNodeBindTagRequest request);
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
similarity index 50%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
index a084f72fb..85da5a876 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
@@ -15,37 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
+package org.apache.inlong.manager.common.consts;
 
 /**
- * Inlong cluster node entity, including parent id, type, ip, etc.
+ * Constant class for agent ext params
  */
-@Data
-public class InlongClusterNodeEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private Integer id;
-    private Integer parentId;
-    private String type;
-    private String ip;
-    private Integer port;
-    private String protocolType;
-    private Integer nodeLoad;
-    private String nodeTags;
-    private String extParams;
-    private String description;
-
-    private Integer status;
-    private Integer isDeleted;
-    private String creator;
-    private String modifier;
-    private Date createTime;
-    private Date modifyTime;
-    private Integer version;
+public class AgentConstants {
 
+    public static final String AGENT_GROUP_KEY = "agentGroup";
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
index a084f72fb..b57aa6d26 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
@@ -36,7 +36,6 @@ public class InlongClusterNodeEntity implements Serializable {
     private Integer port;
     private String protocolType;
     private Integer nodeLoad;
-    private String nodeTags;
     private String extParams;
     private String description;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index c4de96c0f..561270f63 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -42,7 +42,7 @@ public class StreamSourceEntity implements Serializable {
 
     private String dataNodeName;
     private String inlongClusterName;
-    private String inlongClusterNodeTag;
+    private String inlongClusterNodeGroup;
     private String serializationType;
     private String snapshot;
     private Date reportTime;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 491dba3cd..6465d3c91 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -28,7 +28,6 @@
         <result column="port" jdbcType="INTEGER" property="port"/>
         <result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
         <result column="node_load" jdbcType="INTEGER" property="nodeLoad"/>
-        <result column="node_tags" jdbcType="VARCHAR" property="nodeTags"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="description" jdbcType="VARCHAR" property="description"/>
         <result column="status" jdbcType="INTEGER" property="status"/>
@@ -40,7 +39,7 @@
         <result column="version" jdbcType="INTEGER" property="version"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, parent_id, type, ip, port, protocol_type, node_load, node_tags, ext_params, description,
+        id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description,
         status, is_deleted, creator, modifier, create_time, modify_time, version
     </sql>
 
@@ -48,12 +47,12 @@
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
                                          ip, port, protocol_type,
-                                         node_load, node_tags, ext_params,
+                                         node_load, ext_params,
                                          description, status,
                                          creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
                 #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
-                #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
                 #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
     </insert>
@@ -62,14 +61,13 @@
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         insert into inlong_cluster_node (id, parent_id, type,
                                          ip, port, protocol_type,
-                                         node_load, node_tags, ext_params, status,
+                                         node_load, ext_params, status,
                                          creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
                 #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
-                #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
                 #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
         ON DUPLICATE KEY UPDATE node_load  = VALUES(node_load),
-                                node_tags  = VALUES(node_tags),
                                 ext_params = VALUES(ext_params),
                                 status     = VALUES(status),
                                 modifier   = VALUES(modifier)
@@ -143,7 +141,6 @@
             port          = #{port,jdbcType=INTEGER},
             protocol_type = #{protocolType,jdbcType=VARCHAR},
             node_load     = #{nodeLoad,jdbcType=INTEGER},
-            node_tags     = #{nodeTags,jdbcType=VARCHAR},
             ext_params    = #{extParams,jdbcType=LONGVARCHAR},
             description   = #{description,jdbcType=VARCHAR},
             status        = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7ba947b1f..2858e986a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -31,7 +31,7 @@
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
         <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
         <result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/>
-        <result column="inlong_cluster_node_tag" jdbcType="VARCHAR" property="inlongClusterNodeTag"/>
+        <result column="inlong_cluster_node_group" jdbcType="VARCHAR" property="inlongClusterNodeGroup"/>
         <result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
         <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
         <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
@@ -47,7 +47,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid,
-        data_node_name, inlong_cluster_name, inlong_cluster_node_tag, serialization_type, snapshot, report_time,
+        data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time,
         ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
@@ -55,14 +55,14 @@
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (inlong_group_id, inlong_stream_id,
                                    source_type, source_name, template_id, agent_ip,
-                                   uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_tag,
+                                   uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group,
                                    serialization_type, snapshot,
                                    report_time, ext_params, status,
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
                 #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR},
-                #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeTag,jdbcType=VARCHAR},
+                #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
                 #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
@@ -337,8 +337,8 @@
             <if test="inlongClusterName != null">
                 inlong_cluster_name = #{inlongClusterName,jdbcType=INTEGER},
             </if>
-            <if test="inlongClusterNodeTag != null">
-                inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=INTEGER},
+            <if test="inlongClusterNodeGroup != null">
+                inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=INTEGER},
             </if>
             <if test="serializationType != null">
                 serialization_type = #{serializationType,jdbcType=VARCHAR},
@@ -382,7 +382,7 @@
             uuid                = #{uuid,jdbcType=VARCHAR},
             data_node_name      = #{dataNodeName,jdbcType=VARCHAR},
             inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
-            inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=VARCHAR},
+            inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=VARCHAR},
             serialization_type  = #{serializationType,jdbcType=VARCHAR},
             snapshot            = #{snapshot,jdbcType=LONGVARCHAR},
             report_time         = #{reportTime,jdbcType=TIMESTAMP},
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
similarity index 72%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
index ad6fe0673..493c03ab4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.cluster;
+package org.apache.inlong.manager.pojo.cluster.agent;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -25,24 +25,21 @@ import javax.validation.constraints.NotBlank;
 import java.util.List;
 
 /**
- * Inlong cluster node bind or unbind tag request
+ * Inlong cluster node bind or unbind group.Group is used to distinguish which stream source tasks are collected
  */
 @Data
-@ApiModel("Cluster node bind and unbind tag request")
-public class ClusterNodeBindTagRequest {
+@ApiModel("Cluster node bind and unbind stream source label request, stream source label is a filter to judge "
+        + "whether to accept the stream source task")
+public class AgentClusterNodeBindGroupRequest {
 
-    @NotBlank(message = "Cluster nodeTag cannot be blank")
-    @ApiModelProperty(value = "Cluster node tag")
-    private String clusterNodeTag;
+    @NotBlank(message = "Cluster agent group cannot be blank")
+    @ApiModelProperty(value = "Cluster agent group")
+    private String agentGroup;
 
     @NotBlank(message = "clusterName cannot be blank")
     @ApiModelProperty(value = "Cluster name")
     private String clusterName;
 
-    @NotBlank(message = "type cannot be blank")
-    @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
-    private String type;
-
     @ApiModelProperty(value = "Cluster node ip list which needs to bind tag")
     private List<String> bindClusterNodes;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 099ab45e7..f0adcdd5e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -81,10 +81,10 @@ public class SourceRequest {
     @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongClusterName;
 
-    @ApiModelProperty("Inlong cluster node tag")
+    @ApiModelProperty("Inlong cluster node label for filtering stream source collect task")
     @Length(min = 1, max = 128, message = "length must be between 1 and 128")
     @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
-    private String inlongClusterNodeTag;
+    private String inlongClusterNodeGroup;
 
     @ApiModelProperty("Data node name")
     @Length(max = 128, message = "length must be less than or equal to 128")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 8ecccbc8b..fac486088 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -386,15 +385,6 @@ public interface InlongClusterService {
      */
     Boolean deleteNode(Integer id, String operator);
 
-    /**
-     * Bind or unbind cluster tag node for cluster node.
-     *
-     * @param request cluster info to be modified
-     * @param operator current operator
-     * @return whether succeed
-     */
-    Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator);
-
     /**
      * Delete cluster node.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a76ec6dcd..598dd10a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -54,7 +54,6 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -1390,60 +1389,6 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return true;
     }
 
-    @Override
-    public Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator) {
-        HashSet<String> bindSet = Sets.newHashSet();
-        HashSet<String> unbindSet = Sets.newHashSet();
-        if (request.getBindClusterNodes() != null) {
-            bindSet.addAll(request.getBindClusterNodes());
-        }
-        if (request.getUnbindClusterNodes() != null) {
-            unbindSet.addAll(request.getUnbindClusterNodes());
-        }
-        Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
-                "can not add and del node tag in the sameTime");
-        InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), request.getType());
-        String message = "Current user does not have permission to bind cluster node tag";
-        checkUser(cluster, operator, message);
-
-        if (CollectionUtils.isNotEmpty(bindSet)) {
-            bindSet.stream().flatMap(clusterNode -> {
-                ClusterPageRequest pageRequest = new ClusterPageRequest();
-                pageRequest.setParentId(cluster.getId());
-                pageRequest.setType(request.getType());
-                pageRequest.setKeyword(clusterNode);
-                return clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
-                    .forEach(entity -> {
-                        String nodeTags = entity.getNodeTags();
-                        Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
-                                : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
-                        tagSet.add(request.getClusterNodeTag());
-                        entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
-                        clusterNodeMapper.updateById(entity);
-                    });
-        }
-
-        if (CollectionUtils.isNotEmpty(unbindSet)) {
-            unbindSet.stream().flatMap(clusterNode -> {
-                ClusterPageRequest pageRequest = new ClusterPageRequest();
-                pageRequest.setParentId(cluster.getId());
-                pageRequest.setType(request.getType());
-                pageRequest.setKeyword(clusterNode);
-                return clusterNodeMapper.selectByCondition(pageRequest).stream();
-            }).filter(entity -> entity != null)
-                    .forEach(entity -> {
-                        String nodeTags = entity.getNodeTags();
-                        Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
-                                : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
-                        tagSet.remove(request.getClusterNodeTag());
-                        entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
-                        clusterNodeMapper.updateById(entity);
-                    });
-        }
-        return true;
-    }
-
     @Override
     public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolType) {
         LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index e7db76937..099ebca72 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 
 /**
  * The service interface for agent
@@ -49,4 +50,11 @@ public interface AgentService {
      */
     TaskResult getTaskResult(TaskRequest request);
 
+    /**
+     * Divide the agent into different groups, which collect different stream source tasks.
+     *
+     * @param request Request of the bind group.
+     * @return Whether succeed.
+     */
+    Boolean bindGroup(AgentClusterNodeBindGroupRequest request);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f6e1776ce..0efd42c32 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import com.google.common.collect.Lists;
+import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -33,6 +34,7 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.consts.SourceType;
@@ -54,6 +56,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
@@ -75,6 +78,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -92,6 +96,7 @@ public class AgentServiceImpl implements AgentService {
     private static final int ISSUED_STATUS = 3;
     private static final int MODULUS_100 = 100;
     private static final int TASK_FETCH_SIZE = 2;
+    private static final Gson GSON = new Gson();
 
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
@@ -192,6 +197,66 @@ public class AgentServiceImpl implements AgentService {
         return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
+    public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
+        HashSet<String> bindSet = Sets.newHashSet();
+        HashSet<String> unbindSet = Sets.newHashSet();
+        if (request.getBindClusterNodes() != null) {
+            bindSet.addAll(request.getBindClusterNodes());
+        }
+        if (request.getUnbindClusterNodes() != null) {
+            unbindSet.addAll(request.getUnbindClusterNodes());
+        }
+        Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
+                "can not add and del node tag in the sameTime");
+        InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), ClusterType.AGENT);
+        String message = "Current user does not have permission to bind cluster node tag";
+
+        if (CollectionUtils.isNotEmpty(bindSet)) {
+            bindSet.stream().flatMap(clusterNode -> {
+                ClusterPageRequest pageRequest = new ClusterPageRequest();
+                pageRequest.setParentId(cluster.getId());
+                pageRequest.setType(ClusterType.AGENT);
+                pageRequest.setKeyword(clusterNode);
+                return clusterNodeMapper.selectByCondition(pageRequest).stream();
+            }).filter(entity -> entity != null)
+                    .forEach(entity -> {
+                        Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+                                : GSON.fromJson(entity.getExtParams(), Map.class);
+                        Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                                : Sets.newHashSet(
+                                        extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        groupSet.add(request.getAgentGroup());
+                        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+                        entity.setExtParams(GSON.toJson(extParams));
+                        clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+                    });
+        }
+
+        if (CollectionUtils.isNotEmpty(unbindSet)) {
+            unbindSet.stream().flatMap(clusterNode -> {
+                ClusterPageRequest pageRequest = new ClusterPageRequest();
+                pageRequest.setParentId(cluster.getId());
+                pageRequest.setType(ClusterType.AGENT);
+                pageRequest.setKeyword(clusterNode);
+                return clusterNodeMapper.selectByCondition(pageRequest).stream();
+            }).filter(entity -> entity != null)
+                    .forEach(entity -> {
+                        Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+                                : GSON.fromJson(entity.getExtParams(), Map.class);
+                        Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                                : Sets.newHashSet(
+                                        extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+                        groupSet.remove(request.getAgentGroup());
+                        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+                        entity.setExtParams(GSON.toJson(extParams));
+                        clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+                    });
+        }
+        return true;
+    }
+
     /**
      * Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
      *
@@ -251,7 +316,7 @@ public class AgentServiceImpl implements AgentService {
 
     private void preProcessFileTask(TaskRequest taskRequest) {
         preProcessTemplateFileTask(taskRequest);
-        preProcessTagFileTasks(taskRequest);
+        preProcessLabelFileTasks(taskRequest);
     }
 
     /**
@@ -306,7 +371,7 @@ public class AgentServiceImpl implements AgentService {
      *
      * @param taskRequest
      */
-    private void preProcessTagFileTasks(TaskRequest taskRequest) {
+    private void preProcessLabelFileTasks(TaskRequest taskRequest) {
         List<Integer> needProcessedStatusList = Arrays.asList(
                 SourceStatus.SOURCE_NORMAL.getCode(),
                 SourceStatus.SOURCE_FAILED.getCode(),
@@ -329,7 +394,7 @@ public class AgentServiceImpl implements AgentService {
                     Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
                             SourceStatus.SOURCE_FROZEN,
                             SourceStatus.TO_BE_ISSUED_FROZEN);
-                    if (!matchTag(sourceEntity, clusterNodeEntity)
+                    if (!matchLabel(sourceEntity, clusterNodeEntity)
                             && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
                         LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
                                 + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
@@ -348,7 +413,7 @@ public class AgentServiceImpl implements AgentService {
                             SourceStatus.TO_BE_ISSUED_ACTIVE);
                     Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
                             StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
-                    if (matchTag(sourceEntity, clusterNodeEntity)
+                    if (matchLabel(sourceEntity, clusterNodeEntity)
                             && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
                             && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
                         LOGGER.info("Transform task({}) from {} to {} because tag rematch "
@@ -514,20 +579,21 @@ public class AgentServiceImpl implements AgentService {
         }).collect(Collectors.toList());
     }
 
-    private boolean matchTag(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
+    private boolean matchLabel(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
         Preconditions.checkNotNull(sourceEntity, "cluster must be valid");
-        if (sourceEntity.getInlongClusterNodeTag() == null) {
+        if (sourceEntity.getInlongClusterNodeGroup() == null) {
             return true;
         }
-        if (clusterNodeEntity == null || clusterNodeEntity.getNodeTags() == null) {
+
+        if (clusterNodeEntity == null || clusterNodeEntity.getExtParams() == null) {
             return false;
         }
 
-        Set<String> nodeTags = Stream.of(
-                clusterNodeEntity.getNodeTags().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        Set<String> sourceTags = Stream.of(
-                sourceEntity.getInlongClusterNodeTag().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        return sourceTags.stream().anyMatch(sourceTag -> nodeTags.contains(sourceTag));
+        Map<String, String> extParams = GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class);
+        Set<String> clusterNodeLabels = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+                : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+        Set<String> sourceLabels = Stream.of(
+                sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+        return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel));
     }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 7ccb7db5d..2fb8d5608 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterStatus;
 import org.apache.inlong.manager.common.enums.NodeStatus;
@@ -47,14 +49,21 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Component
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
     private static final String AUTO_REGISTERED = "auto registered";
+    private static final Gson GSON = new Gson();
 
     @Getter
     private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -211,21 +220,30 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
         clusterNode.setProtocolType(heartbeat.getProtocolType());
         clusterNode.setNodeLoad(heartbeat.getLoad());
-        clusterNode.setNodeTags(heartbeat.getNodeTag());
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
         clusterNode.setDescription(AUTO_REGISTERED);
+        insertOrUpdateLabel(clusterNode, heartbeat);
         return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
     }
 
     private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
         clusterNode.setNodeLoad(heartbeat.getLoad());
-        clusterNode.setNodeTags(heartbeat.getNodeTag());
+        insertOrUpdateLabel(clusterNode, heartbeat);
         return clusterNodeMapper.updateById(clusterNode);
     }
 
+    private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
+        Set<String> groupSet = heartbeat.getNodeGroup() == null ? new HashSet<>()
+                : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+        Map<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<>()
+                : GSON.fromJson(clusterNode.getExtParams(), Map.class);
+        extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+        clusterNode.setExtParams(GSON.toJson(extParams));
+    }
+
     private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {
         return clusterNodeMapper.deleteById(clusterNode.getId());
     }
@@ -281,6 +299,6 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         if (oldHB == null) {
             return true;
         }
-        return oldHB.getNodeTag() != newHB.getNodeTag() || oldHB.getLoad() != newHB.getLoad();
+        return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad();
     }
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index f509f934b..2f85d1c9a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -78,7 +78,7 @@ class AgentServiceTest extends ServiceBaseTest {
     private InlongStreamEntityMapper streamMapper;
 
     private List<Pair<String, String>> groupStreamCache;
-    private List<String> tagCache;
+    private List<String> groupCache;
 
     /**
      * Save template source
@@ -110,7 +110,7 @@ class AgentServiceTest extends ServiceBaseTest {
     /**
      * mock {@link StreamSourceService#save}
      */
-    public Pair<String, String> saveSource(String tag) {
+    public Pair<String, String> saveSource(String group) {
         String groupId = UUID.randomUUID().toString();
         String streamId = UUID.randomUUID().toString();
         groupStreamCache.add(new ImmutablePair<>(groupId, streamId));
@@ -121,9 +121,9 @@ class AgentServiceTest extends ServiceBaseTest {
         sourceInfo.setInlongStreamId(streamId);
         sourceInfo.setSourceType(SourceType.FILE);
         sourceInfo.setInlongClusterName(MockAgent.CLUSTER_NAME);
-        sourceInfo.setInlongClusterNodeTag(tag);
+        sourceInfo.setInlongClusterNodeGroup(group);
         sourceInfo.setSourceName(
-                String.format("Source task for cluster(%s) and tag(%s)", MockAgent.CLUSTER_NAME, tag));
+                String.format("Source task for cluster(%s) and group(%s)", MockAgent.CLUSTER_NAME, group));
         sourceService.save(sourceInfo, GLOBAL_OPERATOR);
         sourceService.updateStatus(
                 groupId,
@@ -170,7 +170,7 @@ class AgentServiceTest extends ServiceBaseTest {
     @BeforeEach
     public void setupEach() {
         groupStreamCache = new ArrayList<>();
-        tagCache = new ArrayList<>();
+        groupCache = new ArrayList<>();
     }
 
     @AfterEach
@@ -184,27 +184,27 @@ class AgentServiceTest extends ServiceBaseTest {
                     groupStreamCache.stream().map(Pair::getValue).collect(Collectors.toList()));
         }
         groupStreamCache.clear();
-        tagCache.stream().forEach(tag -> bindTag(false, tag));;
+        groupCache.stream().forEach(group -> bindGroup(false, group));;
     }
 
-    private void bindTag(boolean bind, String tag) {
+    private void bindGroup(boolean bind, String group) {
         if (bind) {
-            tagCache.add(tag);
+            groupCache.add(group);
         }
-        agent.bindTag(bind, tag);
+        agent.bindGroup(bind, group);
     }
 
     /**
-     * Test bind tag for node.
+     * Test bind group for node.
      */
     @Test
-    public void testTagMatch() {
-        saveSource("tag1,tag3");
-        saveSource("tag2,tag3");
-        saveSource("tag2,tag3");
-        saveSource("tag4");
-        bindTag(true, "tag1");
-        bindTag(true, "tag2");
+    public void testGroupMatch() {
+        saveSource("group1,group3");
+        saveSource("group2,group3");
+        saveSource("group2,group3");
+        saveSource("group4");
+        bindGroup(true, "group1");
+        bindGroup(true, "group2");
 
         TaskResult taskResult = agent.pullTask();
         Assertions.assertTrue(taskResult.getCmdConfigs().isEmpty());
@@ -220,12 +220,12 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag mismatch source task and next time rematch source task.
+     * Test node group mismatch source task and next time rematch source task.
      */
     @Test
-    public void testTagMismatchAndRematch() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+    public void testGroupMismatchAndRematch() {
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
@@ -235,8 +235,8 @@ class AgentServiceTest extends ServiceBaseTest {
                 .findAny()
                 .get()
                 .getId();
-        // unbind tag and mismatch
-        bindTag(false, "tag1");
+        // unbind group and mismatch
+        bindGroup(false, "group1");
         TaskResult t1 = agent.pullTask();
         Assertions.assertEquals(1, t1.getDataConfigs().size());
         Assertions.assertEquals(1, t1.getDataConfigs().stream()
@@ -246,8 +246,8 @@ class AgentServiceTest extends ServiceBaseTest {
         DataConfig d1 = t1.getDataConfigs().get(0);
         Assertions.assertEquals(sourceId, d1.getTaskId());
 
-        // bind tag and rematch
-        bindTag(true, "tag1");
+        // bind group and rematch
+        bindGroup(true, "group1");
         TaskResult t2 = agent.pullTask();
         Assertions.assertEquals(1, t2.getDataConfigs().size());
         Assertions.assertEquals(1, t2.getDataConfigs().stream()
@@ -263,14 +263,14 @@ class AgentServiceTest extends ServiceBaseTest {
      */
     @Test
     public void testSuspendFailWhenNotAck() {
-        Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
 
         // mismatch
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         agent.pullTask();
 
         // suspend
@@ -282,21 +282,21 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag rematch source task but group suspend
+     * Test node group rematch source task but group suspend
      */
     @Test
     public void testRematchedWhenSuspend() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
 
         // mismatch and rematch
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         agent.pullTask();
         agent.pullTask(); // report last to make it from 304 -> 104
-        bindTag(true, "tag1");
+        bindGroup(true, "group1");
 
         // suspend
         suspendSource(groupStream.getLeft(), groupStream.getRight());
@@ -305,12 +305,12 @@ class AgentServiceTest extends ServiceBaseTest {
     }
 
     /**
-     * Test node tag mismatch source task but group restart
+     * Test node group mismatch source task but group restart
      */
     @Test
     public void testMismatchedWhenRestart() {
-        final Pair<String, String> groupStream = saveSource("tag1,tag3");
-        bindTag(true, "tag1");
+        final Pair<String, String> groupStream = saveSource("group1,group3");
+        bindGroup(true, "group1");
 
         agent.pullTask();
         agent.pullTask(); // report last success status
@@ -318,7 +318,7 @@ class AgentServiceTest extends ServiceBaseTest {
         // suspend and restart
         suspendSource(groupStream.getLeft(), groupStream.getRight());
         restartSource(groupStream.getLeft(), groupStream.getRight());
-        bindTag(false, "tag1");
+        bindGroup(false, "group1");
         TaskResult taskResult = agent.pullTask();
         Assertions.assertEquals(1, taskResult.getDataConfigs().size());
         Assertions.assertEquals(1, taskResult.getDataConfigs().stream()
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
index 2acb651f5..9e0061d18 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
@@ -44,7 +44,7 @@ public class MockAgent {
     // 2. Regularly report the previously executed tasks to the manager (may be successful or fail)
     public static final String LOCAL_IP = "127.0.0.1";
     public static final String LOCAL_PORT = "8008";
-    public static final String LOCAL_TAG = "default_tag";
+    public static final String LOCAL_GROUP = "default_group";
     public static final String CLUSTER_TAG = "default_cluster_tag";
     public static final String CLUSTER_NAME = "1c59ef9e8e1e43cfb25ee8b744c9c81b_2790";
 
@@ -52,7 +52,7 @@ public class MockAgent {
     private HeartbeatService heartbeatService;
 
     private Queue<CommandEntity> commands = new LinkedList<>();
-    private Set<String> tags = Sets.newHashSet(LOCAL_TAG);
+    private Set<String> groups = Sets.newHashSet(LOCAL_GROUP);
     private int jobLimit;
 
     public MockAgent(AgentService agentService, HeartbeatService heartbeatService, int jobLimit) {
@@ -83,17 +83,17 @@ public class MockAgent {
         heartbeat.setComponentType(ComponentTypeEnum.Agent.getType());
         heartbeat.setClusterName(CLUSTER_NAME);
         heartbeat.setClusterTag(CLUSTER_TAG);
-        heartbeat.setNodeTag(tags.stream().collect(Collectors.joining(InlongConstants.COMMA)));
+        heartbeat.setNodeGroup(groups.stream().collect(Collectors.joining(InlongConstants.COMMA)));
         heartbeat.setInCharges(GLOBAL_OPERATOR);
         heartbeat.setReportTime(System.currentTimeMillis());
         heartbeatService.reportHeartbeat(heartbeat);
     }
 
-    public void bindTag(boolean bind, String tag) {
+    public void bindGroup(boolean bind, String group) {
         if (bind) {
-            tags.add(tag);
+            groups.add(group);
         } else {
-            tags.remove(tag);
+            groups.remove(group);
         }
         sendHeartbeat();
     }
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 6af822e01..fc4938469 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -133,7 +133,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `port`          int(6)       NULL COMMENT 'Cluster port',
     `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `node_load`     int(11)               DEFAULT '-1' COMMENT 'Current load value of the node',
-    `node_tags`     varchar(512)          DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -330,7 +329,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
-    `inlong_cluster_node_tag` varchar(512)      DEFAULT NULL COMMENT 'Cluster node tag',
+    `inlong_cluster_node_group` varchar(512)      DEFAULT NULL COMMENT 'Cluster node group',
     `serialization_type`  varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`            mediumtext            DEFAULT NULL COMMENT 'Snapshot of this source task',
     `report_time`         timestamp    NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f8bcb47fb..3036a7525 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -143,7 +143,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `port`          int(6)       NULL COMMENT 'Cluster port',
     `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `node_load`     int(11)               DEFAULT '-1' COMMENT 'Current load value of the node',
-    `node_tags`     varchar(512)          DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -347,7 +346,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
-    `inlong_cluster_node_tag` varchar(512)      DEFAULT NULL COMMENT 'Cluster node tag',
+    `inlong_cluster_node_group` varchar(512)      DEFAULT NULL COMMENT 'Cluster node group',
     `serialization_type`  varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`            mediumtext            DEFAULT NULL COMMENT 'Snapshot of this source task',
     `report_time`         timestamp    NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/changes-1.5.0.sql b/inlong-manager/manager-web/sql/changes-1.5.0.sql
index 4b655bc2c..88ed142bd 100644
--- a/inlong-manager/manager-web/sql/changes-1.5.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.5.0.sql
@@ -32,9 +32,5 @@ ALTER TABLE `inlong_cluster_node`
     ADD COLUMN `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node';
 
 
-ALTER TABLE `inlong_cluster_node`
-    ADD COLUMN `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip';
-
-
 ALTER TABLE `stream_source`
-    ADD COLUMN `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag';
\ No newline at end of file
+    ADD COLUMN `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 3ea4634d3..df49e13ea 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -228,14 +227,6 @@ public class InlongClusterController {
         return Response.success(clusterService.deleteNode(id, LoginUserUtils.getLoginUser().getName()));
     }
 
-    @RequestMapping(value = "/cluster/node/bindTag")
-    @OperationLog(operation = OperationType.UPDATE)
-    @ApiOperation(value = "Bind or unbind cluster node tag")
-    public Response<Boolean> bindNodeTag(@Validated @RequestBody ClusterNodeBindTagRequest request) {
-        String username = LoginUserUtils.getLoginUser().getName();
-        return Response.success(clusterService.bindNodeTag(request, username));
-    }
-
     @PostMapping("/cluster/testConnection")
     @ApiOperation(value = "Test connection for inlong cluster")
     public Response<Boolean> testConnection(@RequestBody ClusterRequest request) {
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 88343f940..5e2270637 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.AgentService;
@@ -68,4 +69,9 @@ public class AgentController {
         return Response.success(agentService.getTaskResult(request));
     }
 
+    @PostMapping("/agent/bindGroup")
+    @ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.")
+    public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {
+        return Response.success(agentService.bindGroup(request));
+    }
 }


[inlong] 01/05: [INLONG-7156][Agent] Support directly sending raw file data (#7157)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Jan 5 16:32:24 2023 +0800

    [INLONG-7156][Agent] Support directly sending raw file data (#7157)
---
 .../sources/reader/file/FileReaderOperator.java    |  8 ++++-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +++++++++++++++++++++-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +++-
 .../agent-plugins/src/test/resources/test/3.txt    |  5 ++++
 5 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 32ede5da4..583632877 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -21,11 +21,11 @@ import com.google.gson.Gson;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
@@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader {
 
     private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
     private final StringBuffer sb = new StringBuffer();
+    private boolean needMetadata = false;
 
     public FileReaderOperator(File file, int position) {
         this(file, position, "");
@@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader {
     }
 
     public String metadataMessage(String message) {
+        if (!needMetadata) {
+            return message;
+        }
         long timestamp = System.currentTimeMillis();
         boolean isJson = FileDataUtils.isJSON(message);
         Map<String, String> mergeData = new HashMap<>(metadata);
@@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader {
         String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
         Arrays.stream(env).forEach(data -> {
             if (data.equalsIgnoreCase(KUBERNETES)) {
+                needMetadata = true;
                 new KubernetesMetadataProvider(this).getData();
             } else if (data.equalsIgnoreCase(ENV_CVM)) {
+                needMetadata = true;
                 metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
                 metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
                 metadata.put(METADATA_FILE_NAME, file.getName());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index d0bf5d99d..011f70bf7 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -169,7 +169,7 @@ public class TestFileAgent {
         await().atMost(10, TimeUnit.SECONDS).until(() -> {
             Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
             return jobs.size() == 1
-                    && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 4;
+                    && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 5;
         });
     }
 
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6159aa219..f89bf0355 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -56,13 +57,15 @@ import java.util.stream.Stream;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
 
 @PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
         "org.w3c.*"})
@@ -135,6 +138,33 @@ public class TestTextFileReader {
         }
     }
 
+    @Test
+    public void testFileRowDataRead() throws URISyntaxException {
+        URI uri = getClass().getClassLoader().getResource("test").toURI();
+        JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
+        String mainPath = Paths.get(uri).toString();
+        jobConfiguration.set(JOB_DIR_FILTER_PATTERNS, Paths.get(mainPath,
+                "3.txt").toFile().getAbsolutePath());
+        jobConfiguration.set(JOB_INSTANCE_ID, "test");
+        jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_GROUP_ID, "groupid");
+        jobConfiguration.set(JOB_STREAM_ID, "streamid");
+        TextFileSource fileSource = new TextFileSource();
+        List<Reader> readerList = fileSource.split(jobConfiguration);
+        Assert.assertEquals(1, readerList.size());
+        Reader reader = readerList.get(0);
+        reader.init(jobConfiguration);
+        while (!reader.isFinished()) {
+            Message message = reader.read();
+            if (message == null) {
+                break;
+            }
+            Assert.assertEquals("agent text content test", message.toString());
+        }
+
+    }
+
     /**
      * Custom line end character.
      */
@@ -152,6 +182,7 @@ public class TestTextFileReader {
         jobConfiguration.set(JOB_STREAM_ID, "streamid");
         jobConfiguration.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL);
         jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol");
+        jobConfiguration.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         TextFileSource fileSource = new TextFileSource();
         List<Reader> readerList = fileSource.split(jobConfiguration);
         Assert.assertEquals(1, readerList.size());
@@ -224,6 +255,7 @@ public class TestTextFileReader {
         jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
         jobProfile.set(JOB_INSTANCE_ID, "1");
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         fileReaderOperator.init(jobProfile);
 
         Assert.assertEquals("world", getContent(
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index cd7c29aa5..e0245a83d 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -180,7 +182,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
-
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
         await().atMost(10, TimeUnit.SECONDS).until(() -> sink.getResult().size() == 100);
@@ -220,6 +222,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
 
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
@@ -252,6 +255,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
         jobProfile.set(JobConstants.JOB_FILE_LINE_END_PATTERN, "[0-9]");
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
 
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
new file mode 100644
index 000000000..45d4bba28
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
@@ -0,0 +1,5 @@
+agent text content test
+agent text content test
+agent text content test
+agent text content test
+agent text content test
\ No newline at end of file