You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/25 13:39:48 UTC
[incubator-inlong] branch master updated: [INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 40a6f16 [INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)
40a6f16 is described below
commit 40a6f163fb7b138c9dc582fcd926c5b2b6fc6abc
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Mar 25 21:39:44 2022 +0800
[INLONG-3359][Agent] Add semaphore for each proxy sink to prevent oom (#3360)
---
.../inlong/agent/constant/AgentConstants.java | 2 +-
.../inlong/agent/constant/CommonConstants.java | 3 ++
.../inlong/agent/plugin/sinks/ProxySink.java | 41 ++++++++++++----------
.../inlong/agent/plugin/sinks/SenderManager.java | 16 ++++++++-
.../agent/plugin/sources/reader/BinlogReader.java | 2 +-
5 files changed, 43 insertions(+), 21 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 590e229..1f2936e 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -145,7 +145,7 @@ public class AgentConstants {
public static final int DEFAULT_TASK_PULL_MAX_SECOND = 2;
public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity";
- public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 10000;
+ public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 1000;
public static final String TRIGGER_CHECK_INTERVAL = "trigger.check.interval";
public static final int DEFAULT_TRIGGER_CHECK_INTERVAL = 2;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 1f5535f..2ab7ae6 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -61,6 +61,9 @@ public class CommonConstants {
// max size of single batch in bytes, default is 200KB.
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
+ public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
+ public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 10000;
+
public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber";
public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index f974957..d03c30d 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -89,19 +89,21 @@ public class ProxySink extends AbstractSink {
@Override
public void write(Message message) {
- if (message != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
- extractStreamFromMessage(message, fieldSplitter);
- if (!(message instanceof EndMessage)) {
- ProxyMessage proxyMessage = ProxyMessage.parse(message);
- // add proxy message to cache.
- cache.compute(proxyMessage.getBatchKey(),
+ try {
+ if (message != null) {
+ senderManager.acquireSemaphore(1);
+ message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
+ extractStreamFromMessage(message, fieldSplitter);
+ if (!(message instanceof EndMessage)) {
+ ProxyMessage proxyMessage = ProxyMessage.parse(message);
+ // add proxy message to cache.
+ cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage = new PackProxyMessage(
- maxBatchSize, maxQueueNumber,
- maxBatchTimeoutMs,
+ maxBatchSize, maxQueueNumber,
+ maxBatchTimeoutMs,
proxyMessage.getInlongStreamId()
);
packProxyMessage.generateExtraMap(syncSend,
@@ -112,16 +114,19 @@ public class ProxySink extends AbstractSink {
//
return packProxyMessage;
});
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
inlongGroupId, inlongStreamId, System.currentTimeMillis());
- // increment the count of successful sinks
- sinkMetric.incSinkSuccessCount();
- streamMetric.incSinkSuccessCount();
- } else {
- // increment the count of failed sinks
- sinkMetric.incSinkFailCount();
- streamMetric.incSinkFailCount();
+ // increment the count of successful sinks
+ sinkMetric.incSinkSuccessCount();
+ streamMetric.incSinkSuccessCount();
+ } else {
+ // increment the count of failed sinks
+ sinkMetric.incSinkFailCount();
+ streamMetric.incSinkFailCount();
+ }
}
+ } catch (Exception e) {
+ LOGGER.error("write message to Proxy sink error", e);
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index c9c0c65..9f4c467 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,6 +85,7 @@ public class SenderManager {
private int ioThreadNum;
private boolean enableBusyWait;
+ private Semaphore semaphore;
public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath) {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
@@ -112,7 +114,8 @@ public class SenderManager {
CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
taskPositionManager = TaskPositionManager.getTaskPositionManager();
-
+ semaphore = new Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
+ CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
ioThreadNum = jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait = jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -139,6 +142,15 @@ public class SenderManager {
return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) % senderList.size());
}
+ public void acquireSemaphore(int messageNum) {
+ try {
+ semaphore.acquire(messageNum);
+ } catch (Exception e) {
+ LOGGER.error("acquire messageNum {} fail, current semaphore {}",
+ messageNum, semaphore.availablePermits());
+ }
+ }
+
/**
* sender
*
@@ -209,6 +221,7 @@ public class SenderManager {
sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
return;
}
+ semaphore.release(bodyList.size());
metric.incSendSuccessNum(bodyList.size());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
@@ -275,6 +288,7 @@ public class SenderManager {
bodyList, groupId, streamId, dataTime, "",
maxSenderTimeout, TimeUnit.SECONDS, extraMap
);
+ semaphore.release(bodyList.size());
} catch (Exception exception) {
LOGGER.error("Exception caught", exception);
// retry time
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index dea2541..54e15ed 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -147,7 +147,7 @@ public class BinlogReader implements Reader {
snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
includeSchemaChanges = jobConf.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
historyMonitorDdl = jobConf.get(JOB_DATABASE_HISTORY_MONITOR_DDL, "false");
- binlogMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 10000));
+ binlogMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
instanceId = jobConf.getInstanceId();
finished = false;