You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/23 01:45:06 UTC
[incubator-inlong] branch master updated: [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4f58731 [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
4f58731 is described below
commit 4f58731ce691a3410edbaf835c515c47a5a61442
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Mar 23 09:45:03 2022 +0800
[INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
---
.../main/java/org/apache/inlong/agent/core/task/TaskWrapper.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index ed1740d..8041c0c 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -17,6 +17,9 @@
package org.apache.inlong.agent.core.task;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -32,6 +35,7 @@ import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +55,7 @@ public class TaskWrapper extends AbstractStateWrapper {
private final int maxRetryTime;
private final int pushMaxWaitTime;
private final int pullMaxWaitTime;
+ private final int readWaitTime;
private ExecutorService executorService;
public TaskWrapper(AgentManager manager, Task task) {
@@ -64,6 +69,7 @@ public class TaskWrapper extends AbstractStateWrapper {
AgentConstants.TASK_PUSH_MAX_SECOND, AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
pullMaxWaitTime = conf.getInt(
AgentConstants.TASK_PULL_MAX_SECOND, AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+ readWaitTime = conf.getInt(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -91,6 +97,7 @@ public class TaskWrapper extends AbstractStateWrapper {
message = task.getReader().read();
}
}
+ AgentUtils.silenceSleepInMs(readWaitTime);
}
LOGGER.info("read end, task exception status is {}, read finish status is {}", isException(),
task.isReadFinished());