You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/27 13:07:38 UTC
[incubator-inlong] branch master updated: [INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 073d704 [INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config
073d704 is described below
commit 073d7040eabde1ec1a52e7e642971b25281e17ef
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Sun Mar 27 21:07:31 2022 +0800
[INLONG-3381][Agent] Agent wait one minute for dataproxy to prepare topic config
---
.../src/main/java/org/apache/inlong/agent/utils/AgentUtils.java | 8 ++++++++
.../main/java/org/apache/inlong/agent/core/task/TaskWrapper.java | 2 ++
.../test/java/org/apache/inlong/agent/task/TestTaskWrapper.java | 2 +-
.../apache/inlong/agent/plugin/sources/reader/BinlogReader.java | 2 +-
.../apache/inlong/agent/plugin/sources/reader/TextFileReader.java | 5 ++++-
.../test/java/org/apache/inlong/agent/plugin/TestFileAgent.java | 2 +-
6 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index 5cccb32..e025401 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -211,6 +211,14 @@ public class AgentUtils {
}
}
+ public static void silenceSleepInMinute(long minutes) {
+ try {
+ TimeUnit.MINUTES.sleep(minutes);
+ } catch (Exception ignored) {
+ LOGGER.warn("silenceSleepInMs ", ignored);
+ }
+ }
+
public static String parseHexStr(String delimiter) throws IllegalArgumentException {
if (delimiter.trim().toLowerCase().startsWith(HEX_PREFIX)) {
//only one char
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 8041c0c..e055543 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -47,6 +47,7 @@ public class TaskWrapper extends AbstractStateWrapper {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskWrapper.class);
public static final int WAIT_FINISH_TIME_OUT = 1;
+ public static final int WAIT_BEGIN_TIME_MINUTE = 1;
private final TaskManager taskManager;
private final Task task;
@@ -204,6 +205,7 @@ public class TaskWrapper extends AbstractStateWrapper {
public void run() {
try {
LOGGER.info("start to run {}, retry time is {}", task.getTaskId(), retryTime.get());
+ AgentUtils.silenceSleepInMinute(WAIT_BEGIN_TIME_MINUTE);
doChangeState(State.RUNNING);
task.init();
submitThreadsAndWait();
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 272e9b1..e3ea435 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -76,7 +76,7 @@ public class TestTaskWrapper {
LOGGER.info("waiting for success");
TimeUnit.MILLISECONDS.sleep(100);
}
- await().atMost(20, TimeUnit.SECONDS).until(()
+ await().atMost(80, TimeUnit.SECONDS).until(()
-> reader.getCount() == writer.getWriterCount() + 1);
Assert.assertEquals("reader is not equals to writer",
reader.getCount(), writer.getWriterCount() + 1);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 54e15ed..138bb2a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -139,7 +139,7 @@ public class BinlogReader implements Reader {
tableWhiteList = jobConf.get(JOB_TABLE_WHITELIST, "[\\s\\S]*.*");
databaseWhiteList = jobConf.get(JOB_DATABASE_WHITELIST, "");
serverTimeZone = jobConf.get(JOB_DATABASE_SERVER_TIME_ZONE, "");
- offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "1000");
+ offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index c42a415..a42dd78 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -192,8 +192,11 @@ public class TextFileReader extends AbstractReader {
@Override
public void destroy() {
+ if (stream == null) {
+ return;
+ }
AgentUtils.finallyClose(stream);
LOGGER.info("destroy reader with read {} num {}",
- streamMetric.getTagName(), streamMetric.getReadNum());
+ streamMetric.getTagName(), streamMetric.getReadNum());
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 861c91b..fd1e4ad 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -149,7 +149,7 @@ public class TestFileAgent {
jobs.forEach(
(s, jobWrapper) -> result.set(jobWrapper.getJob().getJobConf()
.get(JOB_DIR_FILTER_PATTERN).equals(testRootDir
- + FileSystems.getDefault().getSeparator() + "test1.dat"))
+ + FileSystems.getDefault().getSeparator() + "test0.dat"))
);
}
return result.get();