You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/25 13:39:48 UTC

[incubator-inlong] branch master updated: [INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a6f16  [INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)
40a6f16 is described below

commit 40a6f163fb7b138c9dc582fcd926c5b2b6fc6abc
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Mar 25 21:39:44 2022 +0800

    [INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)
---
 .../inlong/agent/constant/AgentConstants.java      |  2 +-
 .../inlong/agent/constant/CommonConstants.java     |  3 ++
 .../inlong/agent/plugin/sinks/ProxySink.java       | 41 ++++++++++++----------
 .../inlong/agent/plugin/sinks/SenderManager.java   | 16 ++++++++-
 .../agent/plugin/sources/reader/BinlogReader.java  |  2 +-
 5 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 590e229..1f2936e 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -145,7 +145,7 @@ public class AgentConstants {
     public static final int DEFAULT_TASK_PULL_MAX_SECOND = 2;
 
     public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity";
-    public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 10000;
+    public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 1000;
 
     public static final String TRIGGER_CHECK_INTERVAL = "trigger.check.interval";
     public static final int DEFAULT_TRIGGER_CHECK_INTERVAL = 2;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 1f5535f..2ab7ae6 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -61,6 +61,9 @@ public class CommonConstants {
     // max size of single batch in bytes, default is 200KB.
     public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
 
+    public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
+    public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 10000;
+
     public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber";
     public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000;
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index f974957..d03c30d 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -89,19 +89,21 @@ public class ProxySink extends AbstractSink {
 
     @Override
     public void write(Message message) {
-        if (message != null) {
-            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
-            extractStreamFromMessage(message, fieldSplitter);
-            if (!(message instanceof EndMessage)) {
-                ProxyMessage proxyMessage = ProxyMessage.parse(message);
-                // add proxy message to cache.
-                cache.compute(proxyMessage.getBatchKey(),
+        try {
+            if (message != null) {
+                senderManager.acquireSemaphore(1);
+                message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
+                message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
+                extractStreamFromMessage(message, fieldSplitter);
+                if (!(message instanceof EndMessage)) {
+                    ProxyMessage proxyMessage = ProxyMessage.parse(message);
+                    // add proxy message to cache.
+                    cache.compute(proxyMessage.getBatchKey(),
                         (s, packProxyMessage) -> {
                             if (packProxyMessage == null) {
                                 packProxyMessage = new PackProxyMessage(
-                                        maxBatchSize, maxQueueNumber,
-                                        maxBatchTimeoutMs,
+                                    maxBatchSize, maxQueueNumber,
+                                    maxBatchTimeoutMs,
                                     proxyMessage.getInlongStreamId()
                                 );
                                 packProxyMessage.generateExtraMap(syncSend,
@@ -112,16 +114,19 @@ public class ProxySink extends AbstractSink {
                             //
                             return packProxyMessage;
                         });
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
                         inlongGroupId, inlongStreamId, System.currentTimeMillis());
-                // increment the count of successful sinks
-                sinkMetric.incSinkSuccessCount();
-                streamMetric.incSinkSuccessCount();
-            } else {
-                // increment the count of failed sinks
-                sinkMetric.incSinkFailCount();
-                streamMetric.incSinkFailCount();
+                    // increment the count of successful sinks
+                    sinkMetric.incSinkSuccessCount();
+                    streamMetric.incSinkSuccessCount();
+                } else {
+                    // increment the count of failed sinks
+                    sinkMetric.incSinkFailCount();
+                    streamMetric.incSinkFailCount();
+                }
             }
+        } catch (Exception e) {
+            LOGGER.error("write message to Proxy sink error", e);
         }
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index c9c0c65..9f4c467 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -84,6 +85,7 @@ public class SenderManager {
 
     private int ioThreadNum;
     private boolean enableBusyWait;
+    private Semaphore semaphore;
 
     public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath) {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
@@ -112,7 +114,8 @@ public class SenderManager {
             CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
         taskPositionManager = TaskPositionManager.getTaskPositionManager();
-
+        semaphore = new Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
+            CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
         ioThreadNum = jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
                 CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
         enableBusyWait = jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -139,6 +142,15 @@ public class SenderManager {
         return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) % senderList.size());
     }
 
+    public void acquireSemaphore(int messageNum) {
+        try {
+            semaphore.acquire(messageNum);
+        } catch (Exception e) {
+            LOGGER.error("acquire messageNum {} fail, current semaphore {}",
+                messageNum, semaphore.availablePermits());
+        }
+    }
+
     /**
      * sender
      *
@@ -209,6 +221,7 @@ public class SenderManager {
                 sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
                 return;
             }
+            semaphore.release(bodyList.size());
             metric.incSendSuccessNum(bodyList.size());
             if (sourcePath != null) {
                 taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
@@ -275,6 +288,7 @@ public class SenderManager {
                 bodyList, groupId, streamId, dataTime, "",
                 maxSenderTimeout, TimeUnit.SECONDS, extraMap
             );
+            semaphore.release(bodyList.size());
         } catch (Exception exception) {
             LOGGER.error("Exception caught", exception);
             // retry time
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index dea2541..54e15ed 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -147,7 +147,7 @@ public class BinlogReader implements Reader {
         snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
         includeSchemaChanges = jobConf.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
         historyMonitorDdl = jobConf.get(JOB_DATABASE_HISTORY_MONITOR_DDL, "false");
-        binlogMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 10000));
+        binlogMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
         instanceId = jobConf.getInstanceId();
         finished = false;