You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/27 13:07:38 UTC

[incubator-inlong] branch master updated: [INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 073d704  [INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config
073d704 is described below

commit 073d7040eabde1ec1a52e7e642971b25281e17ef
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Sun Mar 27 21:07:31 2022 +0800

    [INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config
---
 .../src/main/java/org/apache/inlong/agent/utils/AgentUtils.java   | 8 ++++++++
 .../main/java/org/apache/inlong/agent/core/task/TaskWrapper.java  | 2 ++
 .../test/java/org/apache/inlong/agent/task/TestTaskWrapper.java   | 2 +-
 .../apache/inlong/agent/plugin/sources/reader/BinlogReader.java   | 2 +-
 .../apache/inlong/agent/plugin/sources/reader/TextFileReader.java | 5 ++++-
 .../test/java/org/apache/inlong/agent/plugin/TestFileAgent.java   | 2 +-
 6 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index 5cccb32..e025401 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -211,6 +211,14 @@ public class AgentUtils {
         }
     }
 
+    public static void silenceSleepInMinute(long minutes) {
+        try {
+            TimeUnit.MINUTES.sleep(minutes);
+        } catch (Exception ignored) {
+            LOGGER.warn("silenceSleepInMs ", ignored);
+        }
+    }
+
     public static String parseHexStr(String delimiter) throws IllegalArgumentException {
         if (delimiter.trim().toLowerCase().startsWith(HEX_PREFIX)) {
             //only one char
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 8041c0c..e055543 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -47,6 +47,7 @@ public class TaskWrapper extends AbstractStateWrapper {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TaskWrapper.class);
     public static final int WAIT_FINISH_TIME_OUT = 1;
+    public static final int WAIT_BEGIN_TIME_MINUTE = 1;
 
     private final TaskManager taskManager;
     private final Task task;
@@ -204,6 +205,7 @@ public class TaskWrapper extends AbstractStateWrapper {
     public void run() {
         try {
             LOGGER.info("start to run {}, retry time is {}", task.getTaskId(), retryTime.get());
+            AgentUtils.silenceSleepInMinute(WAIT_BEGIN_TIME_MINUTE);
             doChangeState(State.RUNNING);
             task.init();
             submitThreadsAndWait();
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 272e9b1..e3ea435 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -76,7 +76,7 @@ public class TestTaskWrapper {
             LOGGER.info("waiting for success");
             TimeUnit.MILLISECONDS.sleep(100);
         }
-        await().atMost(20, TimeUnit.SECONDS).until(()
+        await().atMost(80, TimeUnit.SECONDS).until(()
                 -> reader.getCount() == writer.getWriterCount() + 1);
         Assert.assertEquals("reader is not equals to writer",
                 reader.getCount(), writer.getWriterCount() + 1);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 54e15ed..138bb2a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -139,7 +139,7 @@ public class BinlogReader implements Reader {
         tableWhiteList = jobConf.get(JOB_TABLE_WHITELIST, "[\\s\\S]*.*");
         databaseWhiteList = jobConf.get(JOB_DATABASE_WHITELIST, "");
         serverTimeZone = jobConf.get(JOB_DATABASE_SERVER_TIME_ZONE, "");
-        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "1000");
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
         databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
             tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
         offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index c42a415..a42dd78 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -192,8 +192,11 @@ public class TextFileReader extends AbstractReader {
 
     @Override
     public void destroy() {
+        if (stream == null) {
+            return;
+        }
         AgentUtils.finallyClose(stream);
         LOGGER.info("destroy reader with read {} num {}",
-                streamMetric.getTagName(), streamMetric.getReadNum());
+            streamMetric.getTagName(), streamMetric.getReadNum());
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 861c91b..fd1e4ad 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -149,7 +149,7 @@ public class TestFileAgent {
             jobs.forEach(
                     (s, jobWrapper) -> result.set(jobWrapper.getJob().getJobConf()
                             .get(JOB_DIR_FILTER_PATTERN).equals(testRootDir
-                                    + FileSystems.getDefault().getSeparator() + "test1.dat"))
+                                    + FileSystems.getDefault().getSeparator() + "test0.dat"))
             );
         }
         return result.get();