You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 02:36:39 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 11cb26456 [INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)
11cb26456 is described below
commit 11cb26456f0079a519eaa1173ed3087649b8435e
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Sep 21 10:28:27 2022 +0800
[INLONG-5909][Agent] Fix WatchService is no effect in the k8s (#5910)
---
.../apache/inlong/agent/constant/JobConstants.java | 2 +-
.../main/java/org/apache/inlong/agent/db/Db.java | 10 +++
.../org/apache/inlong/agent/db/JobProfileDb.java | 28 ++++++++
.../org/apache/inlong/agent/db/RocksDbImp.java | 17 +++++
.../apache/inlong/agent/pojo/JobProfileDto.java | 1 +
.../org/apache/inlong/agent/core/AgentManager.java | 6 +-
.../java/org/apache/inlong/agent/core/job/Job.java | 25 ++++++-
.../apache/inlong/agent/core/job/JobManager.java | 10 ++-
.../apache/inlong/agent/core/job/JobWrapper.java | 11 +++
.../org/apache/inlong/agent/core/task/Task.java | 4 ++
.../apache/inlong/agent/core/task/TaskManager.java | 4 ++
.../inlong/agent/core/trigger/TriggerManager.java | 83 +++++++++++++++++-----
.../agent/plugin/fetcher/ManagerFetcher.java | 26 +++++++
.../agent/plugin/filter/DateFormatRegex.java | 10 ++-
.../sources/reader/file/FileReaderOperator.java | 3 +
.../sources/reader/file/MonitorTextFile.java | 1 +
.../plugin/sources/reader/file/TextFileReader.java | 12 +++-
.../agent/plugin/trigger/DirectoryTrigger.java | 7 +-
.../inlong/agent/plugin/trigger/PathPattern.java | 1 +
.../inlong/agent/plugin/utils/PluginUtils.java | 7 +-
.../apache/inlong/agent/plugin/TestFileAgent.java | 8 +--
.../agent/plugin/filter/TestDateFormatRegex.java | 39 +++++++---
.../agent/plugin/sources/TestTextFileReader.java | 8 +++
.../src/test/resources/fileAgent.trigger.json | 4 +-
.../src/test/resources/fileAgentJob.json | 4 +-
25 files changed, 279 insertions(+), 52 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 1894d1507..5a8b01469 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -103,7 +103,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
- public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 100;
+ public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 10;
public static final String JOB_ID_PREFIX = "job_";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
index 3218986f2..3efaa24db 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
@@ -76,6 +76,16 @@ public interface Db extends Closeable {
*/
List<KeyValueEntity> search(StateSearchKey searchKey);
+
+ /**
+ * search keyValue list by search key.
+ *
+ * @param searchKeys search keys.
+ * @return key/value list
+ * @throws NullPointerException search key should not be null.
+ */
+ List<KeyValueEntity> search(List<StateSearchKey> searchKeys);
+
/**
* search keyValue list by search key.
*
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index a3ada2d1a..11f1739e4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -24,7 +24,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
/**
* Wrapper for job conf persistence.
@@ -192,4 +198,26 @@ public class JobProfileDb {
}
return profileList;
}
+
+ /**
+ * check local job state from rocksDB.
+ *
+ * @return KV, key is job id and value is subtask config of job
+ */
+ public Map<String, List<String>> getJobsState() {
+ List<KeyValueEntity> entityList = db.search(Arrays.asList(StateSearchKey.values()));
+ Map<String, List<String>> jobStateMap = new HashMap<>();
+ for (KeyValueEntity entity : entityList) {
+ List<String> tmpList = new ArrayList<>();
+ JobProfile jobProfile = entity.getAsJobProfile();
+ String jobState = entity.getStateSearchKey().name().concat(":").concat(jobProfile.toJsonStr());
+ tmpList.add(jobState);
+ List<String> jobStates = jobStateMap.putIfAbsent(jobProfile.get(JOB_ID), tmpList);
+ if (Objects.nonNull(jobStates) && !jobStates.contains(jobState)) {
+ jobStates.addAll(tmpList);
+ jobStateMap.put(jobProfile.get(JOB_ID), jobStates);
+ }
+ }
+ return jobStateMap;
+ }
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 0db14cb3f..f2578b3bb 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -39,6 +39,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -242,6 +243,22 @@ public class RocksDbImp implements Db {
return results;
}
+ @Override
+ public List<KeyValueEntity> search(List<StateSearchKey> searchKeys) {
+ List<KeyValueEntity> results = new LinkedList<>();
+ try (final RocksIterator it = db.newIterator(columnHandlesMap.get(defaultFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ KeyValueEntity keyValue = GSON.fromJson(new String(it.value()), KeyValueEntity.class);
+ if (Objects.nonNull(keyValue) && searchKeys.contains(keyValue.getStateSearchKey())) {
+ results.add(keyValue);
+ }
+ it.next();
+ }
+ }
+ return results;
+ }
+
@Override
public List<CommandEntity> searchCommands(boolean isAcked) {
List<CommandEntity> results = new LinkedList<>();
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index e1a1482f0..6d566c65c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -119,6 +119,7 @@ public class JobProfileDto {
if (null != fileJobTaskConfig.getLineEndPattern()) {
FileJob.Line line = new Line();
line.setEndPattern(fileJobTaskConfig.getLineEndPattern());
+ fileJob.setLine(line);
}
if (null != fileJobTaskConfig.getEnvList()) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 8aaede49c..ad01243e1 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -111,7 +111,7 @@ public class AgentManager extends AbstractDaemon {
// db is a required component, so if not init correctly,
// throw exception and stop running.
return (Db) Class.forName(conf.get(
- AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+ AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
.newInstance();
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
@@ -126,6 +126,10 @@ public class AgentManager extends AbstractDaemon {
return db;
}
+ public JobProfileDb getJobProfileDb() {
+ return jobProfileDb;
+ }
+
public ProfileFetcher getFetcher() {
return fetcher;
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 9b2cf11e2..16e5b8f1f 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -37,13 +37,18 @@ import java.util.List;
public class Job {
private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
-
+ private static int COUNTER = 1;
private final JobProfile jobConf;
// job name
private String name;
// job description
private String description;
private String jobInstanceId;
+ private ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ }
+ };
public Job(JobProfile jobConf) {
this.jobConf = jobConf;
@@ -83,8 +88,21 @@ public class Job {
* @return taskList
*/
public List<Task> createTasks() {
+ return getTasks(this.jobConf);
+ }
+
+ /**
+ * build task from job config
+ *
+ * @param jobConf subtask config in the job
+ * @return new task
+ */
+ public Task createTask(JobProfile jobConf) {
+ return getTasks(jobConf).isEmpty() ? null : createTasks().get(0);
+ }
+
+ private List<Task> getTasks(JobProfile jobConf) {
List<Task> taskList = new ArrayList<>();
- int index = 0;
try {
LOGGER.info("job id: {}, source: {}, channel: {}, sink: {}",
getJobInstanceId(), jobConf.get(JobConstants.JOB_SOURCE_CLASS),
@@ -95,7 +113,8 @@ public class Job {
Sink writer = (Sink) Class.forName(jobConf.get(JobConstants.JOB_SINK)).newInstance();
writer.setSourceName(reader.getReadSource());
Channel channel = (Channel) Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
- String taskId = String.format("%s_%d", jobInstanceId, index++);
+ String taskId = String.format("%s_%d", jobInstanceId, threadNum.get());
+ threadNum.set(threadNum.get() + COUNTER);
taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
}
} catch (Throwable ex) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index a5fb4fb21..f592097d8 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.db.StateSearchKey;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.GsonUtil;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.slf4j.Logger;
@@ -114,16 +115,20 @@ public class JobManager extends AbstractDaemon {
* @param job job
*/
private void addJob(Job job) {
+ if (pendingJobs.containsKey(job.getJobInstanceId())) {
+ return;
+ }
try {
JobWrapper jobWrapper = new JobWrapper(agentManager, job);
- this.runningPool.execute(jobWrapper);
JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
if (jobWrapperRet != null) {
LOGGER.warn("{} has been added to running pool, "
+ "cannot be added repeatedly", job.getJobInstanceId());
+ return;
} else {
getJobMetric().jobRunningCount.incrementAndGet();
}
+ this.runningPool.execute(jobWrapper);
} catch (Exception rje) {
LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
@@ -241,6 +246,9 @@ public class JobManager extends AbstractDaemon {
while (isRunnable()) {
try {
jobProfileDb.removeExpireJobs(jobDbCacheTime);
+ // TODO: manager handles those job state in the future and it's saved locally now.
+ Map<String, List<String>> jobStateMap = jobProfileDb.getJobsState();
+ LOGGER.info("check local job state: {}", GsonUtil.toJson(jobStateMap));
} catch (Exception ex) {
LOGGER.error("removeExpireJobs error caught", ex);
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index cd51aff40..160c7d685 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.core.job;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.task.Task;
@@ -166,7 +167,17 @@ public class JobWrapper extends AbstractStateWrapper {
}));
}
+ /**
+ * get all running task
+ */
public List<Task> getAllTasks() {
return allTasks;
}
+
+ public synchronized void addTask(JobProfile jobProfile) {
+ Task task = job.createTask(jobProfile);
+ allTasks.add(task);
+ LOGGER.info("job name is {} and add new task, total task {}", job.getName(), allTasks.size());
+ taskManager.submitTask(task);
+ }
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
index ad5249ec0..9ac2ed4d6 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
@@ -62,6 +62,10 @@ public class Task {
return channel;
}
+ public JobProfile getJobConf() {
+ return jobConf;
+ }
+
public void init() {
this.channel.init(jobConf);
this.sink.init(jobConf);
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index e6cfaf061..a08f8f0a2 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -128,6 +128,10 @@ public class TaskManager extends AbstractDaemon {
boolean notSubmitted = true;
while (notSubmitted) {
try {
+ if (this.runningPool.isShutdown()) {
+ LOGGER.error("submit task error because thread pool is closed");
+ break;
+ }
this.runningPool.submit(wrapper);
notSubmitted = false;
} catch (Exception ex) {
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index 4b3b3b304..fa2308139 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -26,6 +26,9 @@ import org.apache.inlong.agent.constant.FileCollectType;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.job.JobWrapper;
+import org.apache.inlong.agent.core.task.Task;
+import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.StateSearchKey;
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.apache.inlong.agent.plugin.Trigger;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -34,11 +37,14 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.TRIGGER_ONLY_ONE_JOB;
@@ -86,7 +92,7 @@ public class TriggerManager extends AbstractDaemon {
trigger.init(triggerProfile);
trigger.run();
} catch (Throwable ex) {
- LOGGER.error("exception caught", ex);
+ LOGGER.error("add trigger error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
return false;
}
@@ -126,7 +132,7 @@ public class TriggerManager extends AbstractDaemon {
if (FileCollectType.INCREMENT.equals(syncType)) {
return;
}
- LOGGER.info("initialize submit full path. trigger {} ", profile.getTriggerId());
+ LOGGER.info("initialize submit full sync trigger {}", profile.getTriggerId());
manager.getJobManager().submitFileJobProfile(profile);
}
@@ -141,33 +147,75 @@ public class TriggerManager extends AbstractDaemon {
if (triggerProfile.getBoolean(TRIGGER_ONLY_ONE_JOB, false)) {
deleteRelatedJobs(triggerProfile.getTriggerId());
}
- manager.getJobManager().submitFileJobProfile(profile);
- addToTriggerMap(profile.get(JOB_ID), profile);
+ Map<String, JobWrapper> jobWrapperMap = manager.getJobManager().getJobs();
+ // running job then add new task
+ if (isRunningJob(profile, jobWrapperMap)) {
+ jobWrapperMap.get(profile.getInstanceId()).addTask(profile);
+ }
+ // not running job then add job
+ if (isExistJob(profile)) {
+ manager.getJobManager().submitFileJobProfile(profile);
+ addToTriggerMap(profile.get(JOB_ID), profile);
+ }
}
});
TimeUnit.SECONDS.sleep(triggerFetchInterval);
} catch (Throwable e) {
LOGGER.info("ignored Exception ", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-
}
}
-
};
}
+ private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
+ JobWrapper jobWrapper;
+ try {
+ jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+ } catch (Exception e) {
+ LOGGER.warn("get jobWrapper error: ", e);
+ return false;
+ }
+ List<Task> tasks = jobWrapper.getAllTasks();
+ for (Task task : tasks) {
+ JobProfile runJobProfile = task.getJobConf();
+ if (Objects.equals(runJobProfile.get(JOB_DIR_FILTER_PATTERN), profile.get(JOB_DIR_FILTER_PATTERN))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isExistJob(JobProfile profile) {
+ List<JobProfile> jobProfileList = getJobProfiles();
+ AtomicBoolean isExist = new AtomicBoolean(false);
+ jobProfileList.forEach(job -> {
+ if (profile.get(JOB_ID).equals(job.get(JOB_ID))) {
+ isExist.set(true);
+ }
+ });
+ return !isExist.get();
+ }
+
/**
* delete jobs generated by the trigger
*/
private void deleteRelatedJobs(String triggerId) {
LOGGER.info("start to delete related jobs in triggerId {}", triggerId);
- ConcurrentHashMap<String, JobProfile> jobProfiles =
- triggerJobMap.get(triggerId);
- if (jobProfiles != null) {
- LOGGER.info("trigger can only run one job, stop the others {}", jobProfiles.keySet());
- jobProfiles.keySet().forEach(this::deleteJob);
- triggerJobMap.remove(triggerId);
- }
+ List<JobProfile> jobProfileList = getJobProfiles();
+ jobProfileList.forEach(jobProfile -> {
+ if (Objects.equals(jobProfile.get(JOB_ID), triggerId)) {
+ deleteJob(jobProfile.getInstanceId());
+ }
+ });
+ triggerJobMap.remove(triggerId);
+ }
+
+ private List<JobProfile> getJobProfiles() {
+ JobProfileDb jobProfileDb = manager.getJobProfileDb();
+ List<JobProfile> jobProfileList = jobProfileDb.getJobsByState(StateSearchKey.RUNNING);
+ jobProfileList.addAll(jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED));
+ return jobProfileList;
}
private void deleteJob(String jobInstanceId) {
@@ -180,8 +228,7 @@ public class TriggerManager extends AbstractDaemon {
try {
triggerJobMap.forEach((s, jobProfiles) -> {
for (String jobId : jobProfiles.keySet()) {
- Map<String, JobWrapper> jobs =
- manager.getJobManager().getJobs();
+ Map<String, JobWrapper> jobs = manager.getJobManager().getJobs();
if (jobs.get(jobId) == null) {
triggerJobMap.remove(jobId);
}
@@ -201,10 +248,8 @@ public class TriggerManager extends AbstractDaemon {
* need to put profile in triggerJobMap
*/
private void addToTriggerMap(String triggerId, JobProfile profile) {
- ConcurrentHashMap<String, JobProfile> tmpList =
- new ConcurrentHashMap<>();
- ConcurrentHashMap<String, JobProfile> jobWrappers =
- triggerJobMap.putIfAbsent(triggerId, tmpList);
+ ConcurrentHashMap<String, JobProfile> tmpList = new ConcurrentHashMap<>();
+ ConcurrentHashMap<String, JobProfile> jobWrappers = triggerJobMap.putIfAbsent(triggerId, tmpList);
if (jobWrappers == null) {
jobWrappers = tmpList;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 7eab6d2f4..cc1b4a9e2 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -30,6 +30,8 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.CommandDb;
+import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.StateSearchKey;
import org.apache.inlong.agent.plugin.Trigger;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
@@ -55,7 +57,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -85,6 +89,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
@@ -287,6 +292,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
}
for (DataConfig dataConfig : taskResult.getDataConfigs()) {
TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
+ if (triggerIsRunning(profile)) {
+ continue;
+ }
LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
if (profile.hasKey(JOB_TRIGGER)) {
dealWithTdmTriggerProfile(profile);
@@ -300,6 +308,24 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
}
}
+ private boolean triggerIsRunning(TriggerProfile newProfile) {
+ int type = ManagerOpEnum.getOpType(newProfile.getInt(JOB_OP)).getType();
+ if (ManagerOpEnum.ACTIVE.getType() != type || ManagerOpEnum.ADD.getType() != type) {
+ return false;
+ }
+ JobProfileDb jobProfileDb = agentManager.getJobProfileDb();
+ List<JobProfile> jobsByState = jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED);
+ jobsByState.addAll(jobProfileDb.getJobsByState(StateSearchKey.RUNNING));
+ AtomicBoolean jobIsRunning = new AtomicBoolean(false);
+ jobsByState.forEach(jobProfile -> {
+ if (Objects.equals(jobProfile.get(JOB_ID), newProfile.get(JOB_ID))) {
+ LOGGER.error("job is running or accepted, {} submit failed", newProfile.get(JOB_ID));
+ jobIsRunning.set(true);
+ }
+ });
+ return jobIsRunning.get();
+ }
+
/**
* form file command fetch request
*/
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
index ceaaacf73..1ca91e81f 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
@@ -35,9 +35,12 @@ public class DateFormatRegex implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(DateFormatRegex.class);
private static final String YEAR = "YYYY";
+ private static final String YEAR_LOWERCASE = "yyyy";
private static final String MONTH = "MM";
private static final String DAY = "DD";
+ private static final String DAY_LOWERCASE = "dd";
private static final String HOUR = "HH";
+ private static final String MINUTE = "mm";
private static final String NORMAL_FORMATTER = "yyyyMMddHHmm";
@@ -104,11 +107,14 @@ public class DateFormatRegex implements Filter {
File.separator, 0);
List<String> formattedList = new ArrayList<>();
for (String regexStr : regexList) {
- if (regexStr.contains(YEAR)) {
+ if (regexStr.contains(YEAR) || regexStr.contains(YEAR_LOWERCASE)) {
String tmpRegexStr = regexStr.replace(YEAR, time.substring(0, 4))
+ .replace(YEAR_LOWERCASE,time.substring(0, 4))
.replace(MONTH, time.substring(4, 6))
.replace(DAY, time.substring(6, 8))
- .replace(HOUR, time.substring(8, 10));
+ .replace(DAY_LOWERCASE,time.substring(6, 8))
+ .replace(HOUR, time.substring(8, 10))
+ .replace(MINUTE,time.substring(10));
formattedList.add(tmpRegexStr);
formattedTime = time;
} else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 2ff0bf189..d9a9f5451 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -98,6 +98,9 @@ public class FileReaderOperator extends AbstractReader {
}
private boolean validateMessage(String message) {
+ if (StringUtils.isBlank(message)) {
+ return false;
+ }
if (validators.isEmpty()) {
return true;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 0c6f6a72e..d8160bd8b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -106,6 +106,7 @@ public final class MonitorTextFile {
public void run() {
try {
TimeUnit.SECONDS.sleep(WAIT_TIME);
+ LOGGER.info("start {} monitor", this.fileReaderOperator.file.getAbsolutePath());
while (!this.fileReaderOperator.finished) {
long expireTime = Long.parseLong(fileReaderOperator.jobConf
.get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index bda3a5436..cf9707e19 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -58,7 +58,8 @@ public final class TextFileReader extends AbstractFileReader {
List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
fileReaderOperator.position)
.collect(Collectors.toList());
- LOGGER.info("path is {}, data reads size {}", fileReaderOperator.file.getName(), lines.size());
+ LOGGER.info("path is {}, position is {}, data reads size {}", fileReaderOperator.file.getName(),
+ fileReaderOperator.position, lines.size());
List<String> resultLines = new ArrayList<>();
//TODO line regular expression matching
if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
@@ -70,7 +71,8 @@ public final class TextFileReader extends AbstractFileReader {
String data = lineStringBuffer.get(fileReaderOperator.file);
Matcher matcher = pattern.matcher(data);
if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
- String[] splitLines = data.split(matcher.group());
+ String splitStr = matcher.group();
+ String[] splitLines = data.split(splitStr);
int length = splitLines.length;
for (int i = 0; i < length; i++) {
if (i > 0 && i == length - 1 && null != splitLines[i]) {
@@ -79,6 +81,12 @@ public final class TextFileReader extends AbstractFileReader {
}
resultLines.add(splitLines[i].trim());
}
+ // handles cases where the ending is a delimiter.
+ // for example--> line ends pattern: ab{2} and line string: cabbdabbfabb
+ if (data.startsWith(splitStr, data.length() - splitStr.length() - 1)) {
+ length = 1;
+ resultLines.add(lineStringBuffer.get(fileReaderOperator.file));
+ }
if (1 == length) {
lineStringBuffer.remove(fileReaderOperator.file);
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 222969dd3..92a3d2406 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -46,6 +46,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+
/**
* Watch directory, if new valid files are created, create jobs correspondingly.
*/
@@ -272,9 +274,8 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
interval = profile.getInt(
AgentConstants.TRIGGER_CHECK_INTERVAL, AgentConstants.DEFAULT_TRIGGER_CHECK_INTERVAL);
this.profile = profile;
-
- if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
- String pathPattern = this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
+ if (this.profile.hasKey(JOB_DIR_FILTER_PATTERN)) {
+ String pathPattern = this.profile.get(JOB_DIR_FILTER_PATTERN);
String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
if (timeOffset.isEmpty()) {
register(pathPattern);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
index dda014e32..701de9067 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
@@ -59,6 +59,7 @@ public class PathPattern {
rootDir = findRoot(watchDir);
subDirs = new HashSet<>();
dateFormatRegex = DateFormatRegex.ofRegex(watchDir).withOffset(offset);
+ updateDateFormatRegex();
}
/**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 6855dc9fd..1fc8a68a9 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -43,6 +43,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
/**
@@ -78,7 +79,9 @@ public class PluginUtils {
public static Collection<File> findSuitFiles(JobProfile jobConf) {
String dirPattern = jobConf.get(JOB_DIR_FILTER_PATTERN);
LOGGER.info("start to find files with dir pattern {}", dirPattern);
- PathPattern pattern = new PathPattern(dirPattern);
+ PathPattern pattern =
+ jobConf.hasKey(JOB_FILE_TIME_OFFSET) ? new PathPattern(dirPattern, jobConf.get(JOB_FILE_TIME_OFFSET))
+ : new PathPattern(dirPattern);
updateRetryTime(jobConf, pattern);
int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
LOGGER.info("dir pattern {}, max file num {}", dirPattern, maxFileNum);
@@ -132,7 +135,7 @@ public class PluginUtils {
allIps.add(InetAddress.getLocalHost().getHostAddress());
}
} catch (Exception e) {
- LOGGER.error("get local ip list fail with ex {} ", e);
+ LOGGER.error("get local ip list fail with ex:", e);
}
return allIps;
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 6461bef64..27bb0dc10 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -55,6 +55,7 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTE
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
import static org.awaitility.Awaitility.await;
@@ -161,21 +162,16 @@ public class TestFileAgent {
public void testOneJobFullPath() throws Exception {
URI uri = Objects.requireNonNull(getClass().getClassLoader().getResource("test")).toURI();
String path = Paths.get(uri).toString();
- String fileName = path + "/increment_test.txt";
- TestUtils.deleteFile(fileName);
-
String jsonString = TestUtils.getTestTriggerProfile();
TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(jsonString);
triggerProfile.set(JOB_DIR_FILTER_PATTERN, path);
triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
triggerProfile.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+ triggerProfile.set(JOB_ID, "2");
TriggerManager triggerManager = agent.getManager().getTriggerManager();
triggerManager.submitTrigger(triggerProfile);
Thread.sleep(2000);
Assert.assertEquals(3L, checkFullPathReadJob().longValue());
- TestUtils.createFile(fileName);
- Thread.sleep(10000);
- TestUtils.deleteFile(fileName);
}
private boolean checkOnlyOneJob() {
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
index 6e7a7ed2b..6a17849db 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
@@ -17,23 +17,30 @@
package org.apache.inlong.agent.plugin.filter;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.TextFileSource;
+import org.apache.inlong.agent.plugin.trigger.PathPattern;
import org.apache.inlong.agent.utils.AgentUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Locale;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
+
public class TestDateFormatRegex {
private static AgentBaseTestsHelper helper;
@@ -54,12 +61,22 @@ public class TestDateFormatRegex {
public void testRegex() {
File file = Paths.get(helper.getParentPath().toString(), "aad20201201_11.log").toFile();
DateFormatRegex dateFormatRegex = DateFormatRegex
- .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
+ .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
dateFormatRegex.match();
dateFormatRegex.getFormattedTime();
Assert.assertEquals(helper.getParentPath().toString() + "/\\w{3}"
+ AgentUtils.formatCurrentTime("yyyyMMdd_HH") + ".log",
- dateFormatRegex.getFormattedRegex());
+ dateFormatRegex.getFormattedRegex());
+ }
+
+ @Test
+ public void testRegexAndTimeoffset() {
+ ZonedDateTime zoned = ZonedDateTime.now().plusDays(-1);
+ String pathTime = DateTimeFormatter.ofPattern("yyyyMMdd").withLocale(Locale.getDefault()).format(zoned);
+ File file = Paths.get(helper.getParentPath().toString(), pathTime.concat(".log")).toFile();
+ PathPattern entity = new PathPattern(helper.getParentPath().toString() + "/yyyyMMdd.log", "-1d");
+ boolean flag = entity.suitForWatch(file.getPath());
+ Assert.assertTrue(flag);
}
@Test
@@ -70,6 +87,8 @@ public class TestDateFormatRegex {
JobProfile profile = new JobProfile();
profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testPath.toString(), "YYYYMMDD_0").toString());
profile.set(JOB_INSTANCE_ID, "test");
+ profile.set(JOB_GROUP_ID, "groupId");
+ profile.set(JOB_STREAM_ID, "streamId");
List<Reader> readerList = source.split(profile);
Assert.assertEquals(1, readerList.size());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index d73c25933..30d7692d0 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -56,7 +56,9 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYP
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
@PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
"org.w3c.*"})
@@ -110,6 +112,8 @@ public class TestTextFileReader {
jobConfiguration.set(JOB_INSTANCE_ID, "test");
jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+ jobConfiguration.set(JOB_GROUP_ID, "groupid");
+ jobConfiguration.set(JOB_STREAM_ID, "streamid");
TextFileSource fileSource = new TextFileSource();
List<Reader> readerList = fileSource.split(jobConfiguration);
Assert.assertEquals(1, readerList.size());
@@ -139,6 +143,8 @@ public class TestTextFileReader {
jobConfiguration.set(JOB_INSTANCE_ID, "test");
jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+ jobConfiguration.set(JOB_GROUP_ID, "groupid");
+ jobConfiguration.set(JOB_STREAM_ID, "streamid");
jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol");
TextFileSource fileSource = new TextFileSource();
@@ -171,6 +177,8 @@ public class TestTextFileReader {
jobConfiguration.set(JOB_INSTANCE_ID, "test");
jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+ jobConfiguration.set(JOB_GROUP_ID, "groupid");
+ jobConfiguration.set(JOB_STREAM_ID, "streamid");
jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
jobConfiguration.set(JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
TextFileSource fileSource = new TextFileSource();
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
index 3337510e2..4a2dd3d64 100644
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
@@ -20,6 +20,8 @@
"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
"sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
- "pattern": "test"
+ "pattern": "test",
+ "groupId": "test_20220913_221",
+ "streamId": "test_20220913_221"
}
}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
index 8ca0e6519..49a425cce 100755
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
@@ -17,6 +17,8 @@
"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
"sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
- "pattern": "test"
+ "pattern": "test",
+ "groupId": "test_20220913_221",
+ "streamId": "test_20220913_221"
}
}
\ No newline at end of file