You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/25 03:31:48 UTC

[incubator-inlong] branch master updated: [INLONG-2297][agent] support audit for source and sink (#2307)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 256ebf9  [INLONG-2297][agent] support audit for source and sink (#2307)
256ebf9 is described below

commit 256ebf94057467356c4204b1ff6b367af41f6435
Author: justinwwhuang <hw...@163.com>
AuthorDate: Tue Jan 25 11:31:41 2022 +0800

    [INLONG-2297][agent] support audit for source and sink (#2307)
---
 inlong-agent/agent-common/pom.xml                  |   9 +-
 .../inlong/agent/constants/AgentConstants.java     |  10 +-
 .../inlong/agent/constants/CommonConstants.java    |   2 +
 .../inlong/agent/metrics/audit/AuditUtils.java     |  93 ++++++++++++++++++
 .../org/apache/inlong/agent/utils/ConfigUtil.java  |   5 +-
 .../org/apache/inlong/agent/core/AgentMain.java    |   4 +-
 .../org/apache/inlong/agent/core/AgentManager.java |   3 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       |  50 +++++-----
 .../inlong/agent/plugin/sinks/PulsarSink.java      | 106 ++++++++++++---------
 .../agent/plugin/sources/reader/SqlReader.java     |  21 +++-
 .../plugin/sources/reader/TextFileReader.java      |  21 +++-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |   4 +
 .../agent/plugin/sources/TestTextFileReader.java   |  14 ++-
 13 files changed, 255 insertions(+), 87 deletions(-)

diff --git a/inlong-agent/agent-common/pom.xml b/inlong-agent/agent-common/pom.xml
index d71a0e5..2010cb0 100755
--- a/inlong-agent/agent-common/pom.xml
+++ b/inlong-agent/agent-common/pom.xml
@@ -18,8 +18,8 @@
     under the License.
 -->
 <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xmlns="http://maven.apache.org/POM/4.0.0"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.inlong</groupId>
         <artifactId>inlong-agent</artifactId>
@@ -102,6 +102,11 @@
             <artifactId>lombok</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
index 5b4e7af..2c5bc81 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
@@ -28,7 +28,9 @@ public class AgentConstants {
     public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local";
 
     public static final String AGENT_LOCAL_CACHE_TIMEOUT = "agent.local.cache.timeout";
-    /** cache timeout in minutes. **/
+    /**
+     * cache timeout in minutes.
+     **/
     public static final int DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT = 30;
 
     public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path";
@@ -166,4 +168,10 @@ public class AgentConstants {
     public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port";
     public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
 
+    public static final String AUDIT_ENABLE = "agent.audit.enable";
+    public static final boolean DEFAULT_AUDIT_ENABLE = true;
+
+    public static final String AUDIT_KEY_PROXYS = "audit.proxys";
+    public static final String DEFAULT_AUDIT_PROXYS = "";
+
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
index d65ad66..855904c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
@@ -25,9 +25,11 @@ public class CommonConstants {
     public static final String DEFAULT_PROXY_NET_TAG = "";
 
     public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId";
+    public static final String DEFAULT_PROXY_INLONG_GROUP_ID = "default_inlong_group_id";
     public static final String POSITION_SUFFIX = ".position";
 
     public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId";
+    public static final String DEFAULT_PROXY_INLONG_STREAM_ID = "default_inlong_stream_id";
 
     public static final String PROXY_LOCAL_HOST = "proxy.localHost";
     public static final String DEFAULT_PROXY_LOCALHOST = AgentUtils.getLocalIp();
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
new file mode 100644
index 0000000..87baa0e
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics.audit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.util.AuditConfig;
+
+import java.util.HashSet;
+
+import static org.apache.inlong.agent.constants.AgentConstants.AUDIT_ENABLE;
+import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AUDIT_ENABLE;
+import static org.apache.inlong.agent.constants.AgentConstants.AUDIT_KEY_PROXYS;
+import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AUDIT_PROXYS;
+
+/**
+ * AuditUtils
+ */
+public class AuditUtils {
+
+    public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
+    public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
+    public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
+    public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
+    public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
+    public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
+
+    private static boolean IS_AUDIT = true;
+
+    /**
+     * initAudit
+     */
+    public static void initAudit() {
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        // IS_AUDIT
+        IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
+        if (IS_AUDIT) {
+            // AuditProxy
+            String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS);
+            HashSet<String> proxys = new HashSet<>();
+            if (!StringUtils.isBlank(strIpPorts)) {
+                String[] ipPorts = strIpPorts.split("\\s+");
+                for (String ipPort : ipPorts) {
+                    proxys.add(ipPort);
+                }
+            }
+            AuditImp.getInstance().setAuditProxy(proxys);
+            // AuditConfig
+            String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
+            int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS);
+            AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+            AuditImp.getInstance().setAuditConfig(auditConfig);
+        }
+    }
+
+    /**
+     * add
+     *
+     * @param auditID
+     */
+    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime) {
+        if (!IS_AUDIT) {
+            return;
+        }
+        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, 0);
+    }
+
+    /**
+     * sendReport
+     */
+    public static void sendReport() {
+        if (!IS_AUDIT) {
+            return;
+        }
+        AuditImp.getInstance().sendReport();
+    }
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
index a922af0..5bae21d 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
@@ -19,16 +19,15 @@
 
 package org.apache.inlong.agent.utils;
 
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
 import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_PROMETHEUS_ENABLE;
 import static org.apache.inlong.agent.constants.AgentConstants.PROMETHEUS_ENABLE;
 
-import org.apache.inlong.agent.conf.AgentConfiguration;
-
 public class ConfigUtil {
 
     public static boolean isPrometheusEnabled() {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
         return conf.getBoolean(PROMETHEUS_ENABLE, DEFAULT_PROMETHEUS_ENABLE);
     }
-
 }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
index 2c8d655..a4f6303 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +124,7 @@ public class AgentMain {
         CommandLine cl = initOptions(args);
         assert cl != null;
         initAgentConf(cl);
+        AuditUtils.initAudit();
         AgentManager manager = new AgentManager();
         try {
             manager.start();
@@ -141,7 +143,7 @@ public class AgentMain {
             LOGGER.error("exception caught", ex);
         } finally {
             manager.stop();
-
+            AuditUtils.sendReport();
             if (metricsServer != null) {
                 metricsServer.stop();
             }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index ceae99a..274106f 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.agent.constants.JobConstants.JOB_TRIGGER;
 
 import java.lang.reflect.Constructor;
 import java.util.List;
+
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
@@ -112,7 +113,7 @@ public class AgentManager extends AbstractDaemon {
             // db is a required component, so if not init correctly,
             // throw exception and stop running.
             return (Db) Class.forName(conf.get(
-                    AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+                            AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
                     .newInstance();
         } catch (Exception ex) {
             throw new UnsupportedClassVersionError(ex.getMessage());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index a0e30e7..9800b04 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -50,12 +50,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constants.CommonConstants;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.message.PackProxyMessage;
@@ -98,10 +100,10 @@ public class ProxySink extends AbstractSink {
     public ProxySink() {
         if (ConfigUtil.isPrometheusEnabled()) {
             this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
-                PROXY_SINK_TAG_NAME, index.incrementAndGet()));
+                    PROXY_SINK_TAG_NAME, index.incrementAndGet()));
         } else {
             this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
-                PROXY_SINK_TAG_NAME, index.incrementAndGet()));
+                    PROXY_SINK_TAG_NAME, index.incrementAndGet()));
         }
     }
 
@@ -113,19 +115,21 @@ public class ProxySink extends AbstractSink {
             extractStreamFromMessage(message, fieldSplitter);
             if (!(message instanceof EndMessage)) {
                 ProxyMessage proxyMessage = ProxyMessage.parse(message);
-                    // add proxy message to cache.
+                // add proxy message to cache.
                 cache.compute(proxyMessage.getInlongStreamId(),
-                    (s, packProxyMessage) -> {
-                        if (packProxyMessage == null) {
-                            packProxyMessage = new PackProxyMessage(
-                            maxBatchSize, maxQueueNumber,
-                            maxBatchTimeoutMs, proxyMessage.getInlongStreamId());
-                        }
-                        // add message to package proxy
-                        packProxyMessage.addProxyMessage(proxyMessage);
-                        //
-                        return packProxyMessage;
-                    });
+                        (s, packProxyMessage) -> {
+                            if (packProxyMessage == null) {
+                                packProxyMessage = new PackProxyMessage(
+                                        maxBatchSize, maxQueueNumber,
+                                        maxBatchTimeoutMs, proxyMessage.getInlongStreamId());
+                            }
+                            // add message to package proxy
+                            packProxyMessage.addProxyMessage(proxyMessage);
+                            //
+                            return packProxyMessage;
+                        });
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
                 // increment the count of successful sinks
                 sinkMetrics.incSinkSuccessCount();
             } else {
@@ -138,12 +142,13 @@ public class ProxySink extends AbstractSink {
     /**
      * extract stream id from message if message filter is presented
      * or use the default stream id
+     *
      * @param message
      */
     private void extractStreamFromMessage(Message message, byte[] fieldSplitter) {
         if (messageFilter != null) {
             message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
-                messageFilter.filterStreamId(message, fieldSplitter));
+                    messageFilter.filterStreamId(message, fieldSplitter));
         } else {
             message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
         }
@@ -170,8 +175,8 @@ public class ProxySink extends AbstractSink {
                             senderManager.sendBatch(jobInstanceId, inlongGroupId, result.getKey(),
                                     result.getValue(), 0, dataTime);
                             LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
-                                    + "dataTime is {}", inlongGroupId, result.getRight().size(),
-                                jobInstanceId, sourceName, dataTime);
+                                            + "dataTime is {}", inlongGroupId, result.getRight().size(),
+                                    jobInstanceId, sourceName, dataTime);
                         }
 
                     });
@@ -187,20 +192,20 @@ public class ProxySink extends AbstractSink {
     public void init(JobProfile jobConf) {
         maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE);
         maxQueueNumber = jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
-            DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
+                DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
         maxBatchTimeoutMs = jobConf.getInt(
-            PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
+                PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
         jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
         batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
-            DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+                DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
         cache = new ConcurrentHashMap<>(10);
         dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
-            jobConf.get(JOB_CYCLE_UNIT, ""));
+                jobConf.get(JOB_CYCLE_UNIT, ""));
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
         inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
         messageFilter = initMessageFilter(jobConf);
         fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes(
-            StandardCharsets.UTF_8);
+                StandardCharsets.UTF_8);
         executorService.execute(flushCache());
         senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
         try {
@@ -240,6 +245,7 @@ public class ProxySink extends AbstractSink {
 
     /**
      * check whether all stream id messages finished
+     *
      * @return
      */
     private boolean sinkFinish() {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 560068b..8133a7e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -17,37 +17,16 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ASYNC;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_THREAD_NUM;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_CACHE_CAPACITY;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_POLL_TIMEOUT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ASYNC;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXCOUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXSIZE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BLOCK_QUEUE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_COMPRESS_TYPE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ENABLE_BATCH;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_MAX_PENDING_COUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_THREAD_NUM;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SERVERS;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_CACHE_CAPACITY;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_POLL_TIMEOUT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_TOPIC;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.Sink;
@@ -68,11 +47,39 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_THREAD_NUM;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_THREAD_NUM;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ASYNC;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ASYNC;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_POLL_TIMEOUT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_POLL_TIMEOUT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_CACHE_CAPACITY;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_CACHE_CAPACITY;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SERVERS;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_TOPIC;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_MAX_PENDING_COUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXSIZE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXCOUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ENABLE_BATCH;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BLOCK_QUEUE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_COMPRESS_TYPE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE;
+
 public class PulsarSink extends AbstractDaemon implements Sink {
+
     private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSink.class);
 
     private static final String PULSAR_SINK_TAG_NAME = "AgentPulsarMetric";
 
+    private String inlongGroupId;
+    private String inlongStreamId;
     private boolean async;
     private long pollTimeout;
     private int threadNum;
@@ -90,14 +97,14 @@ public class PulsarSink extends AbstractDaemon implements Sink {
     public PulsarSink() {
         if (ConfigUtil.isPrometheusEnabled()) {
             this.pluginMetricNew = new PluginPrometheusMetric(AgentUtils.getUniqId(
-                PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+                    PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
             this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
-                PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+                    PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
         } else {
             this.pluginMetricNew = new PluginJmxMetric(AgentUtils.getUniqId(
-                PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+                    PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
             this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
-                PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+                    PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
         }
     }
 
@@ -106,6 +113,8 @@ public class PulsarSink extends AbstractDaemon implements Sink {
         if (message != null && writing) {
             // if message is not null
             try {
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
                 // put message to cache, wait until cache is not full.
                 pluginMetricNew.incSendNum();
                 cache.put(message.getBody());
@@ -131,7 +140,8 @@ public class PulsarSink extends AbstractDaemon implements Sink {
 
     @Override
     public void init(JobProfile jobConf) {
-
+        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
         threadNum = jobConf.getInt(PULSAR_PRODUCER_THREAD_NUM, DEFAULT_PULSAR_PRODUCER_THREAD_NUM);
         async = jobConf.getBoolean(PULSAR_PRODUCER_ASYNC, DEFAULT_PULSAR_PRODUCER_ASYNC);
         pollTimeout = jobConf.getLong(PULSAR_SINK_POLL_TIMEOUT, DEFAULT_PULSAR_SINK_POLL_TIMEOUT);
@@ -146,7 +156,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
         try {
             stop();
             LOGGER.info("send success num is {}, failed num is {}",
-                pluginMetricNew.getSendSuccessNum(), pluginMetricNew.getSendFailedNum());
+                    pluginMetricNew.getSendSuccessNum(), pluginMetricNew.getSendFailedNum());
         } catch (Exception ex) {
             LOGGER.error("exception caught", ex);
         }
@@ -154,6 +164,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
 
     /**
      * sending data with producer.
+     *
      * @param item
      * @param producer
      * @throws PulsarClientException
@@ -181,6 +192,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
 
     /**
      * thread for sending data.
+     *
      * @return runnable thread.
      */
     private Runnable sendThread(Producer<byte[]> producer) {
@@ -189,7 +201,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
                 try {
                     byte[] item = cache.poll(pollTimeout, TimeUnit.MILLISECONDS);
                     if (item != null) {
-                      // sending to pulsar
+                        // sending to pulsar
                         sendingData(item, producer);
                     }
                 } catch (Exception ex) {
@@ -201,6 +213,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
 
     /**
      * construct producer for every thread.
+     *
      * @return
      */
     private Producer<byte[]> constructProducer() {
@@ -211,29 +224,30 @@ public class PulsarSink extends AbstractDaemon implements Sink {
             String pulsarServers = profile.get(PULSAR_SERVERS);
             String pulsarTopic = profile.get(PULSAR_TOPIC);
             int pendingNum = profile.getInt(PULSAR_PRODUCER_MAX_PENDING_COUNT,
-                DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT);
+                    DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT);
             int batchSize = profile.getInt(PULSAR_PRODUCER_BATCH_MAXSIZE,
-                DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE);
+                    DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE);
             int batchCount = profile.getInt(PULSAR_PRODUCER_BATCH_MAXCOUNT,
-                DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT);
+                    DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT);
             boolean enableBatch = profile.getBoolean(PULSAR_PRODUCER_ENABLE_BATCH,
-                DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH);
+                    DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH);
             boolean blockQueue = profile.getBoolean(PULSAR_PRODUCER_BLOCK_QUEUE, DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE);
             CompressionType compressionType = PluginUtils.convertType(
-                profile.get(PULSAR_PRODUCER_COMPRESS_TYPE,
-                DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE));
-            LOGGER.info("init producer, pulsarServers: {}, topic: {}, pendingNum: {}, batchSize: {}, "
-                + "batchCount: {}, enableBatch: {}, compressType: {}, blockQueue: {}", pulsarServers, pulsarTopic,
-                pendingNum, batchSize, batchCount, enableBatch, compressionType, blockQueue);
+                    profile.get(PULSAR_PRODUCER_COMPRESS_TYPE,
+                            DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE));
+            LOGGER.info("init producer, pulsarServers: {}, topic: {}, pendingNum: {}, batchSize: {},"
+                            + " batchCount: {}, enableBatch: {}, compressType: {}, blockQueue: {}",
+                    pulsarServers, pulsarTopic, pendingNum, batchSize, batchCount,
+                    enableBatch, compressionType, blockQueue);
             client = PulsarClient.builder()
-                .serviceUrl(pulsarServers).build();
+                    .serviceUrl(pulsarServers).build();
             return client.newProducer().topic(pulsarTopic)
-                .compressionType(compressionType)
-                .batchingMaxBytes(batchSize)
-                .batchingMaxMessages(batchCount)
-                .blockIfQueueFull(blockQueue)
-                .maxPendingMessages(pendingNum)
-                .enableBatching(enableBatch).create();
+                    .compressionType(compressionType)
+                    .batchingMaxBytes(batchSize)
+                    .batchingMaxMessages(batchCount)
+                    .blockIfQueueFull(blockQueue)
+                    .maxPendingMessages(pendingNum)
+                    .enableBatching(enableBatch).create();
         } catch (Exception exception) {
             LOGGER.error("error while init producer", exception);
             throw new RuntimeException(exception);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index e8647e9..fa3e9f5 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -21,6 +21,8 @@ import static java.sql.Types.BINARY;
 import static java.sql.Types.BLOB;
 import static java.sql.Types.LONGVARBINARY;
 import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
 
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
@@ -30,11 +32,13 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.CharUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
@@ -53,6 +57,9 @@ public class SqlReader implements Reader {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
 
+    private String inlongGroupId;
+    private String inlongStreamId;
+
     private static final String SQL_READER_TAG_NAME = "AgentSqlMetric";
 
     private static final String JOB_DATABASE_USER = "job.sql.user";
@@ -98,10 +105,10 @@ public class SqlReader implements Reader {
 
         if (ConfigUtil.isPrometheusEnabled()) {
             this.sqlFileMetric = new PluginPrometheusMetric(
-                AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+                    AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
         } else {
             this.sqlFileMetric = new PluginJmxMetric(
-                AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+                    AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
         }
     }
 
@@ -128,6 +135,8 @@ public class SqlReader implements Reader {
                     }
                     lineColumns.add(dataValue);
                 }
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
                 sqlFileMetric.incReadNum();
 
                 return generateMessage(lineColumns);
@@ -186,6 +195,8 @@ public class SqlReader implements Reader {
 
     @Override
     public void init(JobProfile jobConf) {
+        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
         int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
         String userName = jobConf.get(JOB_DATABASE_USER);
         String password = jobConf.get(JOB_DATABASE_PASSWORD);
@@ -193,17 +204,17 @@ public class SqlReader implements Reader {
         int port = jobConf.getInt(JOB_DATABASE_PORT);
 
         String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
-            DEFAULT_JOB_DATABASE_DRIVER_CLASS);
+                DEFAULT_JOB_DATABASE_DRIVER_CLASS);
         separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
         finished = false;
         try {
             String databaseType = jobConf.get(JOB_DATABASE_TYPE, MYSQL);
             String url = String.format("jdbc:%s://%s:%d", databaseType, hostName, port);
             conn = AgentDbUtils.getConnectionFailover(
-                driverClass, url, userName, password);
+                    driverClass, url, userName, password);
             if (databaseType.equals(MYSQL)) {
                 statement = conn.createStatement(
-                    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+                        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
                 statement.setFetchSize(Integer.MIN_VALUE);
                 resultSet = statement.executeQuery(sql);
             } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 646fe9d..d3c0328 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+
 import static org.apache.inlong.agent.constants.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
 
@@ -29,9 +34,11 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Validator;
@@ -53,6 +60,8 @@ public class TextFileReader implements Reader {
 
     public static final int NEVER_STOP_SIGN = -1;
 
+    private String inlongGroupId;
+    private String inlongStreamId;
     private final File file;
     private final int position;
     private final String md5;
@@ -76,10 +85,10 @@ public class TextFileReader implements Reader {
 
         if (ConfigUtil.isPrometheusEnabled()) {
             textFileMetric = new PluginPrometheusMetric(AgentUtils.getUniqId(
-                TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+                    TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
         } else {
             textFileMetric = new PluginJmxMetric(AgentUtils.getUniqId(
-                TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+                    TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
         }
     }
 
@@ -92,6 +101,8 @@ public class TextFileReader implements Reader {
         if (iterator != null && iterator.hasNext()) {
             String message = iterator.next();
             if (validateMessage(message)) {
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
                 textFileMetric.incReadNum();
                 return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
             }
@@ -159,6 +170,8 @@ public class TextFileReader implements Reader {
             LOGGER.info("file name for task is {}, md5 is {}", file, md5);
             stream = Files.newBufferedReader(file.toPath()).lines().skip(position);
             iterator = stream.iterator();
+            inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+            inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
         } catch (Exception ex) {
             throw new FileException("error init stream for " + file.getPath(), ex);
         }
@@ -166,7 +179,7 @@ public class TextFileReader implements Reader {
 
     private void initReadTimeout(JobProfile jobConf) {
         int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
-            DEFAULT_JOB_FILE_MAX_WAIT);
+                DEFAULT_JOB_FILE_MAX_WAIT);
         if (waitTime == NEVER_STOP_SIGN) {
             timeout = NEVER_STOP_SIGN;
         } else {
@@ -178,6 +191,6 @@ public class TextFileReader implements Reader {
     public void destroy() {
         AgentUtils.finallyClose(stream);
         LOGGER.info("destroy reader with read {} num {}",
-            textFileMetric.getTagName(), textFileMetric.getReadNum());
+                textFileMetric.getTagName(), textFileMetric.getReadNum());
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 570fc89..c548f5e 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.agent.plugin;
 
 import static org.apache.inlong.agent.constants.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
@@ -109,6 +111,8 @@ public class TestFileAgent {
                 profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
                     "hugeFile.[0-9].txt").toString());
                 profile.set(JOB_READ_WAIT_TIMEOUT, String.valueOf(readWaitTimeMilliseconds));
+                profile.set(PROXY_INLONG_GROUP_ID, "groupid");
+                profile.set(PROXY_INLONG_STREAM_ID, "streamid");
                 agent.submitJob(profile);
             }
         }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 12c39c8..abdc1bb 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_INSTANCE_ID;
@@ -35,6 +37,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
+
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Message;
@@ -93,8 +96,10 @@ public class TestTextFileReader {
         JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
         String mainPath = Paths.get(uri).toString();
         jobConfiguration.set(JOB_DIR_FILTER_PATTERN, Paths.get(mainPath,
-            "[1-2].txt").toFile().getAbsolutePath());
+                "[1-2].txt").toFile().getAbsolutePath());
         jobConfiguration.set(JOB_INSTANCE_ID, "test");
+        jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
         TextFileSource fileSource = new TextFileSource();
         List<Reader> readerList = fileSource.split(jobConfiguration);
         Assert.assertEquals(2, readerList.size());
@@ -126,7 +131,10 @@ public class TestTextFileReader {
         }
         Files.write(localPath, afterList, StandardOpenOption.APPEND);
         TextFileReader reader = new TextFileReader(localPath.toFile(), 1000);
-        reader.init(new JobProfile());
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
+        reader.init(jobProfile);
 
         Assert.assertEquals("world", new String(reader.read().getBody()));
 
@@ -136,6 +144,8 @@ public class TestTextFileReader {
     public void testTextTailTimeout() throws Exception {
         JobProfile jobProfile = new JobProfile();
         jobProfile.setInt(JOB_FILE_MAX_WAIT, 1);
+        jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
         Path localPath = Paths.get(testDir.toString(), "test1.txt");
         TextFileReader reader = new TextFileReader(localPath.toFile(), 0);
         if (localPath.toFile().exists()) {