You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 02:36:39 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 11cb26456 [INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)
11cb26456 is described below

commit 11cb26456f0079a519eaa1173ed3087649b8435e
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Sep 21 10:28:27 2022 +0800

    [INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)
---
 .../apache/inlong/agent/constant/JobConstants.java |  2 +-
 .../main/java/org/apache/inlong/agent/db/Db.java   | 10 +++
 .../org/apache/inlong/agent/db/JobProfileDb.java   | 28 ++++++++
 .../org/apache/inlong/agent/db/RocksDbImp.java     | 17 +++++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  1 +
 .../org/apache/inlong/agent/core/AgentManager.java |  6 +-
 .../java/org/apache/inlong/agent/core/job/Job.java | 25 ++++++-
 .../apache/inlong/agent/core/job/JobManager.java   | 10 ++-
 .../apache/inlong/agent/core/job/JobWrapper.java   | 11 +++
 .../org/apache/inlong/agent/core/task/Task.java    |  4 ++
 .../apache/inlong/agent/core/task/TaskManager.java |  4 ++
 .../inlong/agent/core/trigger/TriggerManager.java  | 83 +++++++++++++++++-----
 .../agent/plugin/fetcher/ManagerFetcher.java       | 26 +++++++
 .../agent/plugin/filter/DateFormatRegex.java       | 10 ++-
 .../sources/reader/file/FileReaderOperator.java    |  3 +
 .../sources/reader/file/MonitorTextFile.java       |  1 +
 .../plugin/sources/reader/file/TextFileReader.java | 12 +++-
 .../agent/plugin/trigger/DirectoryTrigger.java     |  7 +-
 .../inlong/agent/plugin/trigger/PathPattern.java   |  1 +
 .../inlong/agent/plugin/utils/PluginUtils.java     |  7 +-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  8 +--
 .../agent/plugin/filter/TestDateFormatRegex.java   | 39 +++++++---
 .../agent/plugin/sources/TestTextFileReader.java   |  8 +++
 .../src/test/resources/fileAgent.trigger.json      |  4 +-
 .../src/test/resources/fileAgentJob.json           |  4 +-
 25 files changed, 279 insertions(+), 52 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 1894d1507..5a8b01469 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -103,7 +103,7 @@ public class JobConstants extends CommonConstants {
 
     public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
 
-    public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 100;
+    public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 10;
 
     public static final String JOB_ID_PREFIX = "job_";
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
index 3218986f2..3efaa24db 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
@@ -76,6 +76,16 @@ public interface Db extends Closeable {
      */
     List<KeyValueEntity> search(StateSearchKey searchKey);
 
+
+    /**
+     * search keyValue list by search key.
+     *
+     * @param searchKeys search keys.
+     * @return key/value list
+     * @throws NullPointerException search key should not be null.
+     */
+    List<KeyValueEntity> search(List<StateSearchKey> searchKeys);
+
     /**
      * search keyValue list by search key.
      *
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index a3ada2d1a..11f1739e4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -24,7 +24,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
 
 /**
  * Wrapper for job conf persistence.
@@ -192,4 +198,26 @@ public class JobProfileDb {
         }
         return profileList;
     }
+
+    /**
+     * check local job state from rocksDB.
+     *
+     * @return KV, key is job id and value is subtask config of job
+     */
+    public Map<String, List<String>> getJobsState() {
+        List<KeyValueEntity> entityList = db.search(Arrays.asList(StateSearchKey.values()));
+        Map<String, List<String>> jobStateMap = new HashMap<>();
+        for (KeyValueEntity entity : entityList) {
+            List<String> tmpList = new ArrayList<>();
+            JobProfile jobProfile = entity.getAsJobProfile();
+            String jobState = entity.getStateSearchKey().name().concat(":").concat(jobProfile.toJsonStr());
+            tmpList.add(jobState);
+            List<String> jobStates = jobStateMap.putIfAbsent(jobProfile.get(JOB_ID), tmpList);
+            if (Objects.nonNull(jobStates) && !jobStates.contains(jobState)) {
+                jobStates.addAll(tmpList);
+                jobStateMap.put(jobProfile.get(JOB_ID), jobStates);
+            }
+        }
+        return jobStateMap;
+    }
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 0db14cb3f..f2578b3bb 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -242,6 +243,22 @@ public class RocksDbImp implements Db {
         return results;
     }
 
+    @Override
+    public List<KeyValueEntity> search(List<StateSearchKey> searchKeys) {
+        List<KeyValueEntity> results = new LinkedList<>();
+        try (final RocksIterator it = db.newIterator(columnHandlesMap.get(defaultFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                KeyValueEntity keyValue = GSON.fromJson(new String(it.value()), KeyValueEntity.class);
+                if (Objects.nonNull(keyValue) && searchKeys.contains(keyValue.getStateSearchKey())) {
+                    results.add(keyValue);
+                }
+                it.next();
+            }
+        }
+        return results;
+    }
+
     @Override
     public List<CommandEntity> searchCommands(boolean isAcked) {
         List<CommandEntity> results = new LinkedList<>();
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index e1a1482f0..6d566c65c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -119,6 +119,7 @@ public class JobProfileDto {
         if (null != fileJobTaskConfig.getLineEndPattern()) {
             FileJob.Line line = new Line();
             line.setEndPattern(fileJobTaskConfig.getLineEndPattern());
+            fileJob.setLine(line);
         }
 
         if (null != fileJobTaskConfig.getEnvList()) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 8aaede49c..ad01243e1 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -111,7 +111,7 @@ public class AgentManager extends AbstractDaemon {
             // db is a required component, so if not init correctly,
             // throw exception and stop running.
             return (Db) Class.forName(conf.get(
-                            AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+                    AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
                     .newInstance();
         } catch (Exception ex) {
             throw new UnsupportedClassVersionError(ex.getMessage());
@@ -126,6 +126,10 @@ public class AgentManager extends AbstractDaemon {
         return db;
     }
 
+    public JobProfileDb getJobProfileDb() {
+        return jobProfileDb;
+    }
+
     public ProfileFetcher getFetcher() {
         return fetcher;
     }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 9b2cf11e2..16e5b8f1f 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -37,13 +37,18 @@ import java.util.List;
 public class Job {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
-
+    private static int COUNTER = 1;
     private final JobProfile jobConf;
     // job name
     private String name;
     // job description
     private String description;
     private String jobInstanceId;
+    private ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
+        protected Integer initialValue() {
+            return 0;
+        }
+    };
 
     public Job(JobProfile jobConf) {
         this.jobConf = jobConf;
@@ -83,8 +88,21 @@ public class Job {
      * @return taskList
      */
     public List<Task> createTasks() {
+        return getTasks(this.jobConf);
+    }
+
+    /**
+     * build task from job config
+     *
+     * @param jobConf subtask config in the job
+     * @return new task
+     */
+    public Task createTask(JobProfile jobConf) {
+        return getTasks(jobConf).isEmpty() ? null : createTasks().get(0);
+    }
+
+    private List<Task> getTasks(JobProfile jobConf) {
         List<Task> taskList = new ArrayList<>();
-        int index = 0;
         try {
             LOGGER.info("job id: {}, source: {}, channel: {}, sink: {}",
                     getJobInstanceId(), jobConf.get(JobConstants.JOB_SOURCE_CLASS),
@@ -95,7 +113,8 @@ public class Job {
                 Sink writer = (Sink) Class.forName(jobConf.get(JobConstants.JOB_SINK)).newInstance();
                 writer.setSourceName(reader.getReadSource());
                 Channel channel = (Channel) Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
-                String taskId = String.format("%s_%d", jobInstanceId, index++);
+                String taskId = String.format("%s_%d", jobInstanceId, threadNum.get());
+                threadNum.set(threadNum.get() + COUNTER);
                 taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
             }
         } catch (Throwable ex) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index a5fb4fb21..f592097d8 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.db.StateSearchKey;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.GsonUtil;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.slf4j.Logger;
@@ -114,16 +115,20 @@ public class JobManager extends AbstractDaemon {
      * @param job job
      */
     private void addJob(Job job) {
+        if (pendingJobs.containsKey(job.getJobInstanceId())) {
+            return;
+        }
         try {
             JobWrapper jobWrapper = new JobWrapper(agentManager, job);
-            this.runningPool.execute(jobWrapper);
             JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
             if (jobWrapperRet != null) {
                 LOGGER.warn("{} has been added to running pool, "
                         + "cannot be added repeatedly", job.getJobInstanceId());
+                return;
             } else {
                 getJobMetric().jobRunningCount.incrementAndGet();
             }
+            this.runningPool.execute(jobWrapper);
         } catch (Exception rje) {
             LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
             pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
@@ -241,6 +246,9 @@ public class JobManager extends AbstractDaemon {
             while (isRunnable()) {
                 try {
                     jobProfileDb.removeExpireJobs(jobDbCacheTime);
+                    // TODO: manager handles those job state in the future and it's saved locally now.
+                    Map<String, List<String>> jobStateMap = jobProfileDb.getJobsState();
+                    LOGGER.info("check local job state: {}", GsonUtil.toJson(jobStateMap));
                 } catch (Exception ex) {
                     LOGGER.error("removeExpireJobs error caught", ex);
                 }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index cd51aff40..160c7d685 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.core.job;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.core.task.Task;
@@ -166,7 +167,17 @@ public class JobWrapper extends AbstractStateWrapper {
         }));
     }
 
+    /**
+     * get all running task
+     */
     public List<Task> getAllTasks() {
         return allTasks;
     }
+
+    public synchronized void addTask(JobProfile jobProfile) {
+        Task task = job.createTask(jobProfile);
+        allTasks.add(task);
+        LOGGER.info("job name is {} and add new task, total task {}", job.getName(), allTasks.size());
+        taskManager.submitTask(task);
+    }
 }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
index ad5249ec0..9ac2ed4d6 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
@@ -62,6 +62,10 @@ public class Task {
         return channel;
     }
 
+    public JobProfile getJobConf() {
+        return jobConf;
+    }
+
     public void init() {
         this.channel.init(jobConf);
         this.sink.init(jobConf);
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index e6cfaf061..a08f8f0a2 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -128,6 +128,10 @@ public class TaskManager extends AbstractDaemon {
             boolean notSubmitted = true;
             while (notSubmitted) {
                 try {
+                    if (this.runningPool.isShutdown()) {
+                        LOGGER.error("submit task error because thread pool is closed");
+                        break;
+                    }
                     this.runningPool.submit(wrapper);
                     notSubmitted = false;
                 } catch (Exception ex) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index 4b3b3b304..fa2308139 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -26,6 +26,9 @@ import org.apache.inlong.agent.constant.FileCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.core.job.JobWrapper;
+import org.apache.inlong.agent.core.task.Task;
+import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.StateSearchKey;
 import org.apache.inlong.agent.db.TriggerProfileDb;
 import org.apache.inlong.agent.plugin.Trigger;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -34,11 +37,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
 import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
 import static org.apache.inlong.agent.constant.JobConstants.TRIGGER_ONLY_ONE_JOB;
 
@@ -86,7 +92,7 @@ public class TriggerManager extends AbstractDaemon {
             trigger.init(triggerProfile);
             trigger.run();
         } catch (Throwable ex) {
-            LOGGER.error("exception caught", ex);
+            LOGGER.error("add trigger error: ", ex);
             ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
             return false;
         }
@@ -126,7 +132,7 @@ public class TriggerManager extends AbstractDaemon {
         if (FileCollectType.INCREMENT.equals(syncType)) {
             return;
         }
-        LOGGER.info("initialize submit full path. trigger {} ", profile.getTriggerId());
+        LOGGER.info("initialize submit full sync trigger {}", profile.getTriggerId());
         manager.getJobManager().submitFileJobProfile(profile);
     }
 
@@ -141,33 +147,75 @@ public class TriggerManager extends AbstractDaemon {
                             if (triggerProfile.getBoolean(TRIGGER_ONLY_ONE_JOB, false)) {
                                 deleteRelatedJobs(triggerProfile.getTriggerId());
                             }
-                            manager.getJobManager().submitFileJobProfile(profile);
-                            addToTriggerMap(profile.get(JOB_ID), profile);
+                            Map<String, JobWrapper> jobWrapperMap = manager.getJobManager().getJobs();
+                            // running job then add new task
+                            if (isRunningJob(profile, jobWrapperMap)) {
+                                jobWrapperMap.get(profile.getInstanceId()).addTask(profile);
+                            }
+                            // not running job then add job
+                            if (isExistJob(profile)) {
+                                manager.getJobManager().submitFileJobProfile(profile);
+                                addToTriggerMap(profile.get(JOB_ID), profile);
+                            }
                         }
                     });
                     TimeUnit.SECONDS.sleep(triggerFetchInterval);
                 } catch (Throwable e) {
                     LOGGER.info("ignored Exception ", e);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-
                 }
             }
-
         };
     }
 
+    private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
+        JobWrapper jobWrapper;
+        try {
+            jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+        } catch (Exception e) {
+            LOGGER.warn("get jobWrapper error: ", e);
+            return false;
+        }
+        List<Task> tasks = jobWrapper.getAllTasks();
+        for (Task task : tasks) {
+            JobProfile runJobProfile = task.getJobConf();
+            if (Objects.equals(runJobProfile.get(JOB_DIR_FILTER_PATTERN), profile.get(JOB_DIR_FILTER_PATTERN))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isExistJob(JobProfile profile) {
+        List<JobProfile> jobProfileList = getJobProfiles();
+        AtomicBoolean isExist = new AtomicBoolean(false);
+        jobProfileList.forEach(job -> {
+            if (profile.get(JOB_ID).equals(job.get(JOB_ID))) {
+                isExist.set(true);
+            }
+        });
+        return !isExist.get();
+    }
+
     /**
      * delete jobs generated by the trigger
      */
     private void deleteRelatedJobs(String triggerId) {
         LOGGER.info("start to delete related jobs in triggerId {}", triggerId);
-        ConcurrentHashMap<String, JobProfile> jobProfiles =
-                triggerJobMap.get(triggerId);
-        if (jobProfiles != null) {
-            LOGGER.info("trigger can only run one job, stop the others {}", jobProfiles.keySet());
-            jobProfiles.keySet().forEach(this::deleteJob);
-            triggerJobMap.remove(triggerId);
-        }
+        List<JobProfile> jobProfileList = getJobProfiles();
+        jobProfileList.forEach(jobProfile -> {
+            if (Objects.equals(jobProfile.get(JOB_ID), triggerId)) {
+                deleteJob(jobProfile.getInstanceId());
+            }
+        });
+        triggerJobMap.remove(triggerId);
+    }
+
+    private List<JobProfile> getJobProfiles() {
+        JobProfileDb jobProfileDb = manager.getJobProfileDb();
+        List<JobProfile> jobProfileList = jobProfileDb.getJobsByState(StateSearchKey.RUNNING);
+        jobProfileList.addAll(jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED));
+        return jobProfileList;
     }
 
     private void deleteJob(String jobInstanceId) {
@@ -180,8 +228,7 @@ public class TriggerManager extends AbstractDaemon {
                 try {
                     triggerJobMap.forEach((s, jobProfiles) -> {
                         for (String jobId : jobProfiles.keySet()) {
-                            Map<String, JobWrapper> jobs =
-                                    manager.getJobManager().getJobs();
+                            Map<String, JobWrapper> jobs = manager.getJobManager().getJobs();
                             if (jobs.get(jobId) == null) {
                                 triggerJobMap.remove(jobId);
                             }
@@ -201,10 +248,8 @@ public class TriggerManager extends AbstractDaemon {
      * need to put profile in triggerJobMap
      */
     private void addToTriggerMap(String triggerId, JobProfile profile) {
-        ConcurrentHashMap<String, JobProfile> tmpList =
-                new ConcurrentHashMap<>();
-        ConcurrentHashMap<String, JobProfile> jobWrappers =
-                triggerJobMap.putIfAbsent(triggerId, tmpList);
+        ConcurrentHashMap<String, JobProfile> tmpList = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, JobProfile> jobWrappers = triggerJobMap.putIfAbsent(triggerId, tmpList);
         if (jobWrappers == null) {
             jobWrappers = tmpList;
         }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 7eab6d2f4..cc1b4a9e2 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -30,6 +30,8 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.db.CommandDb;
+import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.StateSearchKey;
 import org.apache.inlong.agent.plugin.Trigger;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
@@ -55,7 +57,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -85,6 +89,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
 import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
@@ -287,6 +292,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
         }
         for (DataConfig dataConfig : taskResult.getDataConfigs()) {
             TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
+            if (triggerIsRunning(profile)) {
+                continue;
+            }
             LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
             if (profile.hasKey(JOB_TRIGGER)) {
                 dealWithTdmTriggerProfile(profile);
@@ -300,6 +308,24 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
         }
     }
 
+    private boolean triggerIsRunning(TriggerProfile newProfile) {
+        int type = ManagerOpEnum.getOpType(newProfile.getInt(JOB_OP)).getType();
+        if (ManagerOpEnum.ACTIVE.getType() != type || ManagerOpEnum.ADD.getType() != type) {
+            return false;
+        }
+        JobProfileDb jobProfileDb = agentManager.getJobProfileDb();
+        List<JobProfile> jobsByState = jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED);
+        jobsByState.addAll(jobProfileDb.getJobsByState(StateSearchKey.RUNNING));
+        AtomicBoolean jobIsRunning = new AtomicBoolean(false);
+        jobsByState.forEach(jobProfile -> {
+            if (Objects.equals(jobProfile.get(JOB_ID), newProfile.get(JOB_ID))) {
+                LOGGER.error("job is running or accepted, {} submit failed", newProfile.get(JOB_ID));
+                jobIsRunning.set(true);
+            }
+        });
+        return jobIsRunning.get();
+    }
+
     /**
      * form file command fetch request
      */
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
index ceaaacf73..1ca91e81f 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
@@ -35,9 +35,12 @@ public class DateFormatRegex implements Filter {
     private static final Logger LOGGER = LoggerFactory.getLogger(DateFormatRegex.class);
 
     private static final String YEAR = "YYYY";
+    private static final String YEAR_LOWERCASE = "yyyy";
     private static final String MONTH = "MM";
     private static final String DAY = "DD";
+    private static final String DAY_LOWERCASE = "dd";
     private static final String HOUR = "HH";
+    private static final String MINUTE = "mm";
 
     private static final String NORMAL_FORMATTER = "yyyyMMddHHmm";
 
@@ -104,11 +107,14 @@ public class DateFormatRegex implements Filter {
                 File.separator, 0);
         List<String> formattedList = new ArrayList<>();
         for (String regexStr : regexList) {
-            if (regexStr.contains(YEAR)) {
+            if (regexStr.contains(YEAR) || regexStr.contains(YEAR_LOWERCASE)) {
                 String tmpRegexStr = regexStr.replace(YEAR, time.substring(0, 4))
+                        .replace(YEAR_LOWERCASE,time.substring(0, 4))
                         .replace(MONTH, time.substring(4, 6))
                         .replace(DAY, time.substring(6, 8))
-                        .replace(HOUR, time.substring(8, 10));
+                        .replace(DAY_LOWERCASE,time.substring(6, 8))
+                        .replace(HOUR, time.substring(8, 10))
+                        .replace(MINUTE,time.substring(10));
                 formattedList.add(tmpRegexStr);
                 formattedTime = time;
             } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 2ff0bf189..d9a9f5451 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -98,6 +98,9 @@ public class FileReaderOperator extends AbstractReader {
     }
 
     private boolean validateMessage(String message) {
+        if (StringUtils.isBlank(message)) {
+            return false;
+        }
         if (validators.isEmpty()) {
             return true;
         }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 0c6f6a72e..d8160bd8b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -106,6 +106,7 @@ public final class MonitorTextFile {
         public void run() {
             try {
                 TimeUnit.SECONDS.sleep(WAIT_TIME);
+                LOGGER.info("start {} monitor", this.fileReaderOperator.file.getAbsolutePath());
                 while (!this.fileReaderOperator.finished) {
                     long expireTime = Long.parseLong(fileReaderOperator.jobConf
                             .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index bda3a5436..cf9707e19 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -58,7 +58,8 @@ public final class TextFileReader extends AbstractFileReader {
         List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
                 fileReaderOperator.position)
                 .collect(Collectors.toList());
-        LOGGER.info("path is {}, data reads size {}", fileReaderOperator.file.getName(), lines.size());
+        LOGGER.info("path is {}, position is {}, data reads size {}", fileReaderOperator.file.getName(),
+                fileReaderOperator.position, lines.size());
         List<String> resultLines = new ArrayList<>();
         //TODO line regular expression matching
         if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
@@ -70,7 +71,8 @@ public final class TextFileReader extends AbstractFileReader {
                 String data = lineStringBuffer.get(fileReaderOperator.file);
                 Matcher matcher = pattern.matcher(data);
                 if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
-                    String[] splitLines = data.split(matcher.group());
+                    String splitStr = matcher.group();
+                    String[] splitLines = data.split(splitStr);
                     int length = splitLines.length;
                     for (int i = 0; i < length; i++) {
                         if (i > 0 && i == length - 1 && null != splitLines[i]) {
@@ -79,6 +81,12 @@ public final class TextFileReader extends AbstractFileReader {
                         }
                         resultLines.add(splitLines[i].trim());
                     }
+                    // handles cases where the ending is a delimiter.
+                    // for example--> line ends pattern: ab{2} and line string: cabbdabbfabb
+                    if (data.startsWith(splitStr, data.length() - splitStr.length() - 1)) {
+                        length = 1;
+                        resultLines.add(lineStringBuffer.get(fileReaderOperator.file));
+                    }
                     if (1 == length) {
                         lineStringBuffer.remove(fileReaderOperator.file);
                     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 222969dd3..92a3d2406 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -46,6 +46,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+
 /**
  * Watch directory, if new valid files are created, create jobs correspondingly.
  */
@@ -272,9 +274,8 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
         interval = profile.getInt(
                 AgentConstants.TRIGGER_CHECK_INTERVAL, AgentConstants.DEFAULT_TRIGGER_CHECK_INTERVAL);
         this.profile = profile;
-
-        if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
-            String pathPattern = this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
+        if (this.profile.hasKey(JOB_DIR_FILTER_PATTERN)) {
+            String pathPattern = this.profile.get(JOB_DIR_FILTER_PATTERN);
             String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
             if (timeOffset.isEmpty()) {
                 register(pathPattern);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
index dda014e32..701de9067 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
@@ -59,6 +59,7 @@ public class PathPattern {
         rootDir = findRoot(watchDir);
         subDirs = new HashSet<>();
         dateFormatRegex = DateFormatRegex.ofRegex(watchDir).withOffset(offset);
+        updateDateFormatRegex();
     }
 
     /**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 6855dc9fd..1fc8a68a9 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -43,6 +43,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME;
 import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
 import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
 
 /**
@@ -78,7 +79,9 @@ public class PluginUtils {
     public static Collection<File> findSuitFiles(JobProfile jobConf) {
         String dirPattern = jobConf.get(JOB_DIR_FILTER_PATTERN);
         LOGGER.info("start to find files with dir pattern {}", dirPattern);
-        PathPattern pattern = new PathPattern(dirPattern);
+        PathPattern pattern =
+                jobConf.hasKey(JOB_FILE_TIME_OFFSET) ? new PathPattern(dirPattern, jobConf.get(JOB_FILE_TIME_OFFSET))
+                        : new PathPattern(dirPattern);
         updateRetryTime(jobConf, pattern);
         int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
         LOGGER.info("dir pattern {}, max file num {}", dirPattern, maxFileNum);
@@ -132,7 +135,7 @@ public class PluginUtils {
                 allIps.add(InetAddress.getLocalHost().getHostAddress());
             }
         } catch (Exception e) {
-            LOGGER.error("get local ip list fail with ex {} ", e);
+            LOGGER.error("get local ip list fail with ex:", e);
         }
         return allIps;
     }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 6461bef64..27bb0dc10 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -55,6 +55,7 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTE
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
 import static org.awaitility.Awaitility.await;
 
@@ -161,21 +162,16 @@ public class TestFileAgent {
     public void testOneJobFullPath() throws Exception {
         URI uri = Objects.requireNonNull(getClass().getClassLoader().getResource("test")).toURI();
         String path = Paths.get(uri).toString();
-        String fileName = path + "/increment_test.txt";
-        TestUtils.deleteFile(fileName);
-
         String jsonString = TestUtils.getTestTriggerProfile();
         TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(jsonString);
         triggerProfile.set(JOB_DIR_FILTER_PATTERN, path);
         triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
         triggerProfile.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+        triggerProfile.set(JOB_ID, "2");
         TriggerManager triggerManager = agent.getManager().getTriggerManager();
         triggerManager.submitTrigger(triggerProfile);
         Thread.sleep(2000);
         Assert.assertEquals(3L, checkFullPathReadJob().longValue());
-        TestUtils.createFile(fileName);
-        Thread.sleep(10000);
-        TestUtils.deleteFile(fileName);
     }
 
     private boolean checkOnlyOneJob() {
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
index 6e7a7ed2b..6a17849db 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
@@ -17,23 +17,30 @@
 
 package org.apache.inlong.agent.plugin.filter;
 
-import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.TextFileSource;
+import org.apache.inlong.agent.plugin.trigger.PathPattern;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Locale;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
+
 public class TestDateFormatRegex {
 
     private static AgentBaseTestsHelper helper;
@@ -54,12 +61,22 @@ public class TestDateFormatRegex {
     public void testRegex() {
         File file = Paths.get(helper.getParentPath().toString(), "aad20201201_11.log").toFile();
         DateFormatRegex dateFormatRegex = DateFormatRegex
-            .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
+                .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
         dateFormatRegex.match();
         dateFormatRegex.getFormattedTime();
         Assert.assertEquals(helper.getParentPath().toString() + "/\\w{3}"
                         + AgentUtils.formatCurrentTime("yyyyMMdd_HH") + ".log",
-            dateFormatRegex.getFormattedRegex());
+                dateFormatRegex.getFormattedRegex());
+    }
+
+    @Test
+    public void testRegexAndTimeoffset() {
+        ZonedDateTime zoned = ZonedDateTime.now().plusDays(-1);
+        String pathTime = DateTimeFormatter.ofPattern("yyyyMMdd").withLocale(Locale.getDefault()).format(zoned);
+        File file = Paths.get(helper.getParentPath().toString(), pathTime.concat(".log")).toFile();
+        PathPattern entity = new PathPattern(helper.getParentPath().toString() + "/yyyyMMdd.log", "-1d");
+        boolean flag = entity.suitForWatch(file.getPath());
+        Assert.assertTrue(flag);
     }
 
     @Test
@@ -70,6 +87,8 @@ public class TestDateFormatRegex {
         JobProfile profile = new JobProfile();
         profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testPath.toString(), "YYYYMMDD_0").toString());
         profile.set(JOB_INSTANCE_ID, "test");
+        profile.set(JOB_GROUP_ID, "groupId");
+        profile.set(JOB_STREAM_ID, "streamId");
 
         List<Reader> readerList = source.split(profile);
         Assert.assertEquals(1, readerList.size());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index d73c25933..30d7692d0 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -56,7 +56,9 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYP
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 
 @PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
         "org.w3c.*"})
@@ -110,6 +112,8 @@ public class TestTextFileReader {
         jobConfiguration.set(JOB_INSTANCE_ID, "test");
         jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_GROUP_ID, "groupid");
+        jobConfiguration.set(JOB_STREAM_ID, "streamid");
         TextFileSource fileSource = new TextFileSource();
         List<Reader> readerList = fileSource.split(jobConfiguration);
         Assert.assertEquals(1, readerList.size());
@@ -139,6 +143,8 @@ public class TestTextFileReader {
         jobConfiguration.set(JOB_INSTANCE_ID, "test");
         jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_GROUP_ID, "groupid");
+        jobConfiguration.set(JOB_STREAM_ID, "streamid");
         jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
         jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol");
         TextFileSource fileSource = new TextFileSource();
@@ -171,6 +177,8 @@ public class TestTextFileReader {
         jobConfiguration.set(JOB_INSTANCE_ID, "test");
         jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_GROUP_ID, "groupid");
+        jobConfiguration.set(JOB_STREAM_ID, "streamid");
         jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
         jobConfiguration.set(JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
         TextFileSource fileSource = new TextFileSource();
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
index 3337510e2..4a2dd3d64 100644
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
@@ -20,6 +20,8 @@
     "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
     "sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
     "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
-    "pattern": "test"
+    "pattern": "test",
+    "groupId":  "test_20220913_221",
+    "streamId": "test_20220913_221"
   }
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
index 8ca0e6519..49a425cce 100755
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
@@ -17,6 +17,8 @@
     "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
     "sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
     "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
-    "pattern": "test"
+    "pattern": "test",
+    "groupId":  "test_20220913_221",
+    "streamId": "test_20220913_221"
   }
 }
\ No newline at end of file