You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/06/28 04:43:39 UTC
[inlong] branch master updated: [INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 50826912e2 [INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)
50826912e2 is described below
commit 50826912e2eca7e5c2b7126f9ba465b5d6e996c4
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Jun 27 21:43:33 2023 -0700
[INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)
---
.../org/apache/inlong/agent/plugin/MiniAgent.java | 55 ++++++++++++++--------
.../apache/inlong/agent/plugin/TestFileAgent.java | 53 +++++++++++++--------
.../inlong/agent/plugin/task/TestTextFileTask.java | 31 +++++++-----
3 files changed, 87 insertions(+), 52 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index 113ea65465..0ccb8072e8 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,17 +42,19 @@ public class MiniAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(MiniAgent.class);
private AgentManager manager;
private final LinkedBlockingQueue<JobProfile> queueJobs = new LinkedBlockingQueue<>(100);
- private List<TriggerProfile> triggerProfileCache = new ArrayList<>();
- private List<JobProfile> jobProfileCache = new ArrayList<>();
+ private final List<TriggerProfile> triggerProfileCache = Collections.synchronizedList(new ArrayList());
+ private final List<JobProfile> jobProfileCache = Collections.synchronizedList(new ArrayList());
/**
* Constructor of MiniAgent.
*/
public MiniAgent() throws Exception {
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- conf.setInt(AGENT_FETCH_CENTER_INTERVAL_SECONDS, 1);
- manager = new AgentManager();
- init();
+ synchronized (this) {
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ conf.setInt(AGENT_FETCH_CENTER_INTERVAL_SECONDS, 1);
+ manager = new AgentManager();
+ init();
+ }
}
private void init() throws Exception {
@@ -70,7 +73,9 @@ public class MiniAgent {
}
public void start() throws Exception {
- manager.start();
+ synchronized (this) {
+ manager.start();
+ }
}
public AgentManager getManager() {
@@ -78,35 +83,47 @@ public class MiniAgent {
}
public void stop() throws Exception {
- manager.stop();
+ synchronized (this) {
+ manager.stop();
+ }
}
public void restart() throws Exception {
- manager.stop();
- manager = new AgentManager();
- init();
- manager.start();
+ synchronized (this) {
+ manager.stop();
+ manager = new AgentManager();
+ init();
+ manager.start();
+ }
}
public void submitJob(JobProfile profile) {
manager.getJobManager().submitFileJobProfile(profile);
- jobProfileCache.add(profile);
+ synchronized (jobProfileCache) {
+ jobProfileCache.add(profile);
+ }
}
public void submitTrigger(TriggerProfile triggerProfile) {
manager.getTriggerManager().submitTrigger(triggerProfile, true);
- triggerProfileCache.add(triggerProfile);
+ synchronized (triggerProfileCache) {
+ triggerProfileCache.add(triggerProfile);
+ }
}
public void cleanupJobs() {
jobProfileCache.forEach(jobProfile -> manager.getJobManager().deleteJob(jobProfile.getInstanceId(), false));
- jobProfileCache.clear();
+ synchronized (jobProfileCache) {
+ jobProfileCache.clear();
+ }
}
public void cleanupTriggers() {
- triggerProfileCache
- .forEach(triggerProfile -> manager.getTriggerManager()
- .deleteTrigger(triggerProfile.getTriggerId(), false));
- triggerProfileCache.clear();
+ synchronized (triggerProfileCache) {
+ triggerProfileCache
+ .forEach(triggerProfile -> manager.getTriggerManager()
+ .deleteTrigger(triggerProfile.getTriggerId(), false));
+ triggerProfileCache.clear();
+ }
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 13df91dc97..39bdc18546 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -131,7 +131,9 @@ public class TestFileAgent {
profile.set(JOB_READ_WAIT_TIMEOUT, String.valueOf(readWaitTimeMilliseconds));
profile.set(PROXY_INLONG_GROUP_ID, "groupid");
profile.set(PROXY_INLONG_STREAM_ID, "streamid");
- agent.submitJob(profile);
+ synchronized (this) {
+ agent.submitJob(profile);
+ }
}
}
}
@@ -146,13 +148,15 @@ public class TestFileAgent {
triggerProfile.set(JOB_DIR_FILTER_PATTERNS, Paths.get(testRootDir.toString(),
"test.dat").toString());
triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
- agent.submitTrigger(triggerProfile);
- TestUtils.createHugeFiles("test.dat", testRootDir.toString(), RECORD);
- TestUtils.createHugeFiles("te1.dat", testRootDir.toString(), RECORD);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
- return jobs.size() == 1;
- });
+ synchronized (this) {
+ agent.submitTrigger(triggerProfile);
+ TestUtils.createHugeFiles("test.dat", testRootDir.toString(), RECORD);
+ TestUtils.createHugeFiles("te1.dat", testRootDir.toString(), RECORD);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
+ return jobs.size() == 1;
+ });
+ }
}
@Test
@@ -165,11 +169,13 @@ public class TestFileAgent {
triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
triggerProfile.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL);
triggerProfile.set(JOB_ID, "10");
- agent.submitTrigger(triggerProfile);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
- return jobs.size() == 1;
- });
+ synchronized (this) {
+ agent.submitTrigger(triggerProfile);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
+ return jobs.size() == 1;
+ });
+ }
}
@Test
@@ -182,7 +188,9 @@ public class TestFileAgent {
profile.set(JOB_DIR_FILTER_PATTERNS, Paths.get(testRootDir.toString(),
"YYYYMMDD").toString());
profile.set(JOB_CYCLE_UNIT, "D");
- agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ synchronized (this) {
+ agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ }
}
}
createFiles(nowDate);
@@ -201,7 +209,9 @@ public class TestFileAgent {
profile.set(JOB_CYCLE_UNIT, "D");
profile.set(AGENT_MESSAGE_FILTER_CLASSNAME,
"org.apache.inlong.agent.plugin.filter.DefaultMessageFilter");
- agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ synchronized (this) {
+ agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ }
}
}
createFiles(nowDate);
@@ -219,7 +229,9 @@ public class TestFileAgent {
"YYYYMMDD").toString());
profile.set(JOB_FILE_TIME_OFFSET, "-1d");
profile.set(JOB_CYCLE_UNIT, "D");
- agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ synchronized (this) {
+ agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+ }
}
}
createFiles(theDateBefore);
@@ -227,10 +239,11 @@ public class TestFileAgent {
}
private void assertJobSuccess() {
- JobProfile jobConf = agent.getManager().getJobManager().getJobConfDb().getJob(StateSearchKey.SUCCESS);
- if (jobConf != null) {
- Assert.assertEquals(1, jobConf.getInt("job.id"));
+ synchronized (this) {
+ JobProfile jobConf = agent.getManager().getJobManager().getJobConfDb().getJob(StateSearchKey.SUCCESS);
+ if (jobConf != null) {
+ Assert.assertEquals(1, jobConf.getInt("job.id"));
+ }
}
}
-
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index 32a610fb10..0232edf0d4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -56,6 +56,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class TestTextFileTask {
public static void setup() throws Exception {
atomicLong = new AtomicLong(0L);
atomicCountLong = new AtomicLong(0L);
- taskCache = new ArrayList<>();
+ taskCache = Collections.synchronizedList(new ArrayList());
TMP_FOLDER.create();
taskManager = new TaskManager(null);
@@ -116,22 +117,26 @@ public class TestTextFileTask {
@After
public void teardownEach() {
taskCache.forEach(taskManager::removeTask);
- taskCache.clear();
+ synchronized (taskCache) {
+ taskCache.clear();
+ }
}
public MockSink mockTextTask(JobProfile jobProfile) {
- List<Reader> readers = new TextFileSource().split(jobProfile);
- Channel channel = new MemoryChannel();
- MockSink sink = new MockSink();
+ synchronized (this) {
+ List<Reader> readers = new TextFileSource().split(jobProfile);
+ Channel channel = new MemoryChannel();
+ MockSink sink = new MockSink();
- readers.forEach(reader -> {
- String taskId = String.format("Text file read %s", reader.getReadSource());
- TaskWrapper taskWrapper =
- new TaskWrapper(taskManager, new Task(taskId, reader, sink, channel, jobProfile));
- taskManager.submitTask(taskWrapper);
- taskCache.add(taskId);
- });
- return sink;
+ readers.forEach(reader -> {
+ String taskId = String.format("Text file read %s", reader.getReadSource());
+ TaskWrapper taskWrapper =
+ new TaskWrapper(taskManager, new Task(taskId, reader, sink, channel, jobProfile));
+ taskManager.submitTask(taskWrapper);
+ taskCache.add(taskId);
+ });
+ return sink;
+ }
}
/**