You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/23 01:45:06 UTC

[incubator-inlong] branch master updated: [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f58731  [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
4f58731 is described below

commit 4f58731ce691a3410edbaf835c515c47a5a61442
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Mar 23 09:45:03 2022 +0800

    [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
---
 .../main/java/org/apache/inlong/agent/core/task/TaskWrapper.java   | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index ed1740d..8041c0c 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.agent.core.task;
 
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
@@ -32,6 +35,7 @@ import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.state.AbstractStateWrapper;
 import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +55,7 @@ public class TaskWrapper extends AbstractStateWrapper {
     private final int maxRetryTime;
     private final int pushMaxWaitTime;
     private final int pullMaxWaitTime;
+    private final int readWaitTime;
     private ExecutorService executorService;
 
     public TaskWrapper(AgentManager manager, Task task) {
@@ -64,6 +69,7 @@ public class TaskWrapper extends AbstractStateWrapper {
                 AgentConstants.TASK_PUSH_MAX_SECOND, AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
         pullMaxWaitTime = conf.getInt(
                 AgentConstants.TASK_PULL_MAX_SECOND, AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+        readWaitTime = conf.getInt(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
         if (executorService == null) {
             executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                     60L, TimeUnit.SECONDS,
@@ -91,6 +97,7 @@ public class TaskWrapper extends AbstractStateWrapper {
                         message = task.getReader().read();
                     }
                 }
+                AgentUtils.silenceSleepInMs(readWaitTime);
             }
             LOGGER.info("read end, task exception status is {}, read finish status is {}", isException(),
                             task.isReadFinished());