You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/06/28 04:43:39 UTC

[inlong] branch master updated: [INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 50826912e2 [INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)
50826912e2 is described below

commit 50826912e2eca7e5c2b7126f9ba465b5d6e996c4
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Jun 27 21:43:33 2023 -0700

    [INLONG-8347][Agent] Optimize the agent UT of testTimeOffset (#8348)
---
 .../org/apache/inlong/agent/plugin/MiniAgent.java  | 55 ++++++++++++++--------
 .../apache/inlong/agent/plugin/TestFileAgent.java  | 53 +++++++++++++--------
 .../inlong/agent/plugin/task/TestTextFileTask.java | 31 +++++++-----
 3 files changed, 87 insertions(+), 52 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index 113ea65465..0ccb8072e8 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -41,17 +42,19 @@ public class MiniAgent {
     private static final Logger LOGGER = LoggerFactory.getLogger(MiniAgent.class);
     private AgentManager manager;
     private final LinkedBlockingQueue<JobProfile> queueJobs = new LinkedBlockingQueue<>(100);
-    private List<TriggerProfile> triggerProfileCache = new ArrayList<>();
-    private List<JobProfile> jobProfileCache = new ArrayList<>();
+    private final List<TriggerProfile> triggerProfileCache = Collections.synchronizedList(new ArrayList());
+    private final List<JobProfile> jobProfileCache = Collections.synchronizedList(new ArrayList());
 
     /**
      * Constructor of MiniAgent.
      */
     public MiniAgent() throws Exception {
-        AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        conf.setInt(AGENT_FETCH_CENTER_INTERVAL_SECONDS, 1);
-        manager = new AgentManager();
-        init();
+        synchronized (this) {
+            AgentConfiguration conf = AgentConfiguration.getAgentConf();
+            conf.setInt(AGENT_FETCH_CENTER_INTERVAL_SECONDS, 1);
+            manager = new AgentManager();
+            init();
+        }
     }
 
     private void init() throws Exception {
@@ -70,7 +73,9 @@ public class MiniAgent {
     }
 
     public void start() throws Exception {
-        manager.start();
+        synchronized (this) {
+            manager.start();
+        }
     }
 
     public AgentManager getManager() {
@@ -78,35 +83,47 @@ public class MiniAgent {
     }
 
     public void stop() throws Exception {
-        manager.stop();
+        synchronized (this) {
+            manager.stop();
+        }
     }
 
     public void restart() throws Exception {
-        manager.stop();
-        manager = new AgentManager();
-        init();
-        manager.start();
+        synchronized (this) {
+            manager.stop();
+            manager = new AgentManager();
+            init();
+            manager.start();
+        }
     }
 
     public void submitJob(JobProfile profile) {
         manager.getJobManager().submitFileJobProfile(profile);
-        jobProfileCache.add(profile);
+        synchronized (jobProfileCache) {
+            jobProfileCache.add(profile);
+        }
     }
 
     public void submitTrigger(TriggerProfile triggerProfile) {
         manager.getTriggerManager().submitTrigger(triggerProfile, true);
-        triggerProfileCache.add(triggerProfile);
+        synchronized (triggerProfileCache) {
+            triggerProfileCache.add(triggerProfile);
+        }
     }
 
     public void cleanupJobs() {
         jobProfileCache.forEach(jobProfile -> manager.getJobManager().deleteJob(jobProfile.getInstanceId(), false));
-        jobProfileCache.clear();
+        synchronized (jobProfileCache) {
+            jobProfileCache.clear();
+        }
     }
 
     public void cleanupTriggers() {
-        triggerProfileCache
-                .forEach(triggerProfile -> manager.getTriggerManager()
-                        .deleteTrigger(triggerProfile.getTriggerId(), false));
-        triggerProfileCache.clear();
+        synchronized (triggerProfileCache) {
+            triggerProfileCache
+                    .forEach(triggerProfile -> manager.getTriggerManager()
+                            .deleteTrigger(triggerProfile.getTriggerId(), false));
+            triggerProfileCache.clear();
+        }
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 13df91dc97..39bdc18546 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -131,7 +131,9 @@ public class TestFileAgent {
                 profile.set(JOB_READ_WAIT_TIMEOUT, String.valueOf(readWaitTimeMilliseconds));
                 profile.set(PROXY_INLONG_GROUP_ID, "groupid");
                 profile.set(PROXY_INLONG_STREAM_ID, "streamid");
-                agent.submitJob(profile);
+                synchronized (this) {
+                    agent.submitJob(profile);
+                }
             }
         }
     }
@@ -146,13 +148,15 @@ public class TestFileAgent {
         triggerProfile.set(JOB_DIR_FILTER_PATTERNS, Paths.get(testRootDir.toString(),
                 "test.dat").toString());
         triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
-        agent.submitTrigger(triggerProfile);
-        TestUtils.createHugeFiles("test.dat", testRootDir.toString(), RECORD);
-        TestUtils.createHugeFiles("te1.dat", testRootDir.toString(), RECORD);
-        await().atMost(10, TimeUnit.SECONDS).until(() -> {
-            Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
-            return jobs.size() == 1;
-        });
+        synchronized (this) {
+            agent.submitTrigger(triggerProfile);
+            TestUtils.createHugeFiles("test.dat", testRootDir.toString(), RECORD);
+            TestUtils.createHugeFiles("te1.dat", testRootDir.toString(), RECORD);
+            await().atMost(10, TimeUnit.SECONDS).until(() -> {
+                Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
+                return jobs.size() == 1;
+            });
+        }
     }
 
     @Test
@@ -165,11 +169,13 @@ public class TestFileAgent {
         triggerProfile.set(JOB_FILE_MAX_WAIT, "-1");
         triggerProfile.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL);
         triggerProfile.set(JOB_ID, "10");
-        agent.submitTrigger(triggerProfile);
-        await().atMost(10, TimeUnit.SECONDS).until(() -> {
-            Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
-            return jobs.size() == 1;
-        });
+        synchronized (this) {
+            agent.submitTrigger(triggerProfile);
+            await().atMost(10, TimeUnit.SECONDS).until(() -> {
+                Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
+                return jobs.size() == 1;
+            });
+        }
     }
 
     @Test
@@ -182,7 +188,9 @@ public class TestFileAgent {
                 profile.set(JOB_DIR_FILTER_PATTERNS, Paths.get(testRootDir.toString(),
                         "YYYYMMDD").toString());
                 profile.set(JOB_CYCLE_UNIT, "D");
-                agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                synchronized (this) {
+                    agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                }
             }
         }
         createFiles(nowDate);
@@ -201,7 +209,9 @@ public class TestFileAgent {
                 profile.set(JOB_CYCLE_UNIT, "D");
                 profile.set(AGENT_MESSAGE_FILTER_CLASSNAME,
                         "org.apache.inlong.agent.plugin.filter.DefaultMessageFilter");
-                agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                synchronized (this) {
+                    agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                }
             }
         }
         createFiles(nowDate);
@@ -219,7 +229,9 @@ public class TestFileAgent {
                         "YYYYMMDD").toString());
                 profile.set(JOB_FILE_TIME_OFFSET, "-1d");
                 profile.set(JOB_CYCLE_UNIT, "D");
-                agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                synchronized (this) {
+                    agent.submitTrigger(TriggerProfile.parseJobProfile(profile));
+                }
             }
         }
         createFiles(theDateBefore);
@@ -227,10 +239,11 @@ public class TestFileAgent {
     }
 
     private void assertJobSuccess() {
-        JobProfile jobConf = agent.getManager().getJobManager().getJobConfDb().getJob(StateSearchKey.SUCCESS);
-        if (jobConf != null) {
-            Assert.assertEquals(1, jobConf.getInt("job.id"));
+        synchronized (this) {
+            JobProfile jobConf = agent.getManager().getJobManager().getJobConfDb().getJob(StateSearchKey.SUCCESS);
+            if (jobConf != null) {
+                Assert.assertEquals(1, jobConf.getInt("job.id"));
+            }
         }
     }
-
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index 32a610fb10..0232edf0d4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -56,6 +56,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class TestTextFileTask {
     public static void setup() throws Exception {
         atomicLong = new AtomicLong(0L);
         atomicCountLong = new AtomicLong(0L);
-        taskCache = new ArrayList<>();
+        taskCache = Collections.synchronizedList(new ArrayList());
         TMP_FOLDER.create();
 
         taskManager = new TaskManager(null);
@@ -116,22 +117,26 @@ public class TestTextFileTask {
     @After
     public void teardownEach() {
         taskCache.forEach(taskManager::removeTask);
-        taskCache.clear();
+        synchronized (taskCache) {
+            taskCache.clear();
+        }
     }
 
     public MockSink mockTextTask(JobProfile jobProfile) {
-        List<Reader> readers = new TextFileSource().split(jobProfile);
-        Channel channel = new MemoryChannel();
-        MockSink sink = new MockSink();
+        synchronized (this) {
+            List<Reader> readers = new TextFileSource().split(jobProfile);
+            Channel channel = new MemoryChannel();
+            MockSink sink = new MockSink();
 
-        readers.forEach(reader -> {
-            String taskId = String.format("Text file read %s", reader.getReadSource());
-            TaskWrapper taskWrapper =
-                    new TaskWrapper(taskManager, new Task(taskId, reader, sink, channel, jobProfile));
-            taskManager.submitTask(taskWrapper);
-            taskCache.add(taskId);
-        });
-        return sink;
+            readers.forEach(reader -> {
+                String taskId = String.format("Text file read %s", reader.getReadSource());
+                TaskWrapper taskWrapper =
+                        new TaskWrapper(taskManager, new Task(taskId, reader, sink, channel, jobProfile));
+                taskManager.submitTask(taskWrapper);
+                taskCache.add(taskId);
+            });
+            return sink;
+        }
     }
 
     /**