You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/06/21 02:25:30 UTC
[inlong] branch master updated: [INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0936f81cb7 [INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)
0936f81cb7 is described below
commit 0936f81cb73d85bba7fbc081b9462e0916eb2cb1
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Jun 21 10:25:24 2023 +0800
[INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)
---
.../inlong/dataproxy/sink/common/SinkContext.java | 68 +++------------
.../inlong/dataproxy/sink/mq/BatchPackManager.java | 68 +++++++--------
.../sink/mq/BatchPackProfileCallback.java | 4 +-
.../sink/mq/MessageQueueClusterProducer.java | 2 +-
.../sink/mq/MessageQueueZoneProducer.java | 18 ++--
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 96 +++++++++++++++-------
.../sink/mq/MessageQueueZoneSinkContext.java | 31 ++++---
.../dataproxy/sink/mq/MessageQueueZoneWorker.java | 43 ++++++----
.../inlong/dataproxy/sink/mq/PackProfile.java | 12 +--
.../sink/mq/RandomCacheClusterSelector.java | 2 +-
.../dataproxy/sink/mq/SimplePackProfile.java | 9 +-
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 24 +++---
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 34 ++++----
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 36 ++++----
.../source2/v1msg/InlongTcpSourceCallback.java | 26 +++---
.../apache/inlong/dataproxy/utils/BufferQueue.java | 47 ++++++-----
.../inlong/dataproxy/utils/SizeSemaphore.java | 2 +-
17 files changed, 262 insertions(+), 260 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 102c88a37d..9ccdb82a05 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -25,9 +25,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
-import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Channel;
@@ -35,22 +33,18 @@ import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
/**
* SinkContext
*/
public class SinkContext {
- public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class);
-
public static final String KEY_MAX_THREADS = "maxThreads";
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
public static final String KEY_MESSAGE_QUEUE_HANDLER = "messageQueueHandler";
+ protected static final Logger logger = LoggerFactory.getLogger(SinkContext.class);
+
protected final String clusterId;
protected final String sinkName;
protected final Context sinkContext;
@@ -62,7 +56,6 @@ public class SinkContext {
protected final long reloadInterval;
//
protected final DataProxyMetricItemSet metricItemSet;
- protected Timer reloadTimer;
// file metric statistic
protected MonitorIndex monitorIndex = null;
private MonitorStats monitorStats = null;
@@ -100,23 +93,12 @@ public class SinkContext {
this.monitorIndex.start();
this.monitorStats.start();
}
- try {
- this.reload();
- this.setReloadTimer();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
}
/**
* close
*/
public void close() {
- try {
- this.reloadTimer.cancel();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
// stop file statistic index
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
if (monitorIndex != null) {
@@ -153,26 +135,6 @@ public class SinkContext {
}
}
- /**
- * setReloadTimer
- */
- protected void setReloadTimer() {
- reloadTimer = new Timer(true);
- TimerTask task = new TimerTask() {
-
- public void run() {
- reload();
- }
- };
- reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
- }
-
- /**
- * reload
- */
- public void reload() {
- }
-
/**
* get clusterId
*
@@ -202,7 +164,7 @@ public class SinkContext {
/**
* get channel
- *
+ *
* @return the channel
*/
public Channel getChannel() {
@@ -255,12 +217,11 @@ public class SinkContext {
Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
if (handlerObject instanceof EventHandler) {
- EventHandler handler = (EventHandler) handlerObject;
- return handler;
+ return (EventHandler) handlerObject;
}
} catch (Throwable t) {
- LOG.error("Fail to init EventHandler,handlerClass:{},error:{}",
- eventHandlerClass, t.getMessage(), t);
+ logger.error("{} fail to init EventHandler,handlerClass:{},error:{}",
+ this.sinkName, eventHandlerClass, t.getMessage(), t);
}
return null;
}
@@ -271,26 +232,17 @@ public class SinkContext {
public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig config) {
String strHandlerClass = config.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER,
PulsarHandler.class.getName());
- LOG.info("mq handler class = {}", strHandlerClass);
+ logger.info("{}'s mq handler class = {}", this.sinkName, strHandlerClass);
try {
Class<?> handlerClass = ClassUtils.getClass(strHandlerClass);
Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
if (handlerObject instanceof MessageQueueHandler) {
- MessageQueueHandler handler = (MessageQueueHandler) handlerObject;
- return handler;
+ return (MessageQueueHandler) handlerObject;
}
} catch (Throwable t) {
- LOG.error("Fail to init MessageQueueHandler,handlerClass:{},error:{}",
- strHandlerClass, t.getMessage(), t);
+ logger.error("{} fail to init MessageQueueHandler,handlerClass:{},error:{}",
+ this.sinkName, strHandlerClass, t.getMessage(), t);
}
return null;
}
-
- /**
- * createBufferQueue
- * @return
- */
- public static BufferQueue<PackProfile> createBufferQueue() {
- return new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
- }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index f1b4f54473..db671d1ad9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -18,7 +18,6 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -42,7 +41,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class BatchPackManager {
- public static final Logger LOG = LoggerFactory.getLogger(BatchPackManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(BatchPackManager.class);
+
public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
@@ -54,8 +54,7 @@ public class BatchPackManager {
private final long dispatchTimeout;
private final long maxPackCount;
private final long maxPackSize;
- private final String sinkName;
- private final BufferQueue<PackProfile> dispatchQueue;
+ private final MessageQueueZoneSink mqZoneSink;
private final ConcurrentHashMap<String, PackProfile> profileCache = new ConcurrentHashMap<>();
// flag that manager need to output overtime data.
private final AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
@@ -65,13 +64,11 @@ public class BatchPackManager {
/**
* Constructor
*
- * @param sinkName the sink name
+ * @param mqZoneSink the mq zone sink
* @param context the process context
- * @param dispatchQueue the batch queue
*/
- public BatchPackManager(String sinkName, Context context, BufferQueue<PackProfile> dispatchQueue) {
- this.sinkName = sinkName;
- this.dispatchQueue = dispatchQueue;
+ public BatchPackManager(MessageQueueZoneSink mqZoneSink, Context context) {
+ this.mqZoneSink = mqZoneSink;
this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, DEFAULT_DISPATCH_TIMEOUT);
this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, DEFAULT_DISPATCH_MAX_PACKCOUNT);
this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, DEFAULT_DISPATCH_MAX_PACKSIZE);
@@ -89,22 +86,22 @@ public class BatchPackManager {
// find dispatch profile
PackProfile dispatchProfile = this.profileCache.get(dispatchKey);
if (dispatchProfile == null) {
- dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
- dispatchTime);
+ dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(),
+ event.getInlongStreamId(), dispatchTime);
this.profileCache.put(dispatchKey, dispatchProfile);
}
// add event
- boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
- if (!addResult) {
+ if (!dispatchProfile.addEvent(event, maxPackCount, maxPackSize)) {
BatchPackProfile newDispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(),
event.getInlongStreamId(), dispatchTime);
PackProfile oldDispatchProfile = this.profileCache.put(dispatchKey, newDispatchProfile);
- this.dispatchQueue.acquire(oldDispatchProfile.getSize());
- this.dispatchQueue.offer(oldDispatchProfile);
- outCounter.addAndGet(dispatchProfile.getCount());
+ if (oldDispatchProfile != null) {
+ this.mqZoneSink.acquireAndOfferDispatchedRecord(oldDispatchProfile);
+ }
+ this.outCounter.addAndGet(dispatchProfile.getCount());
newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
}
- inCounter.incrementAndGet();
+ this.inCounter.incrementAndGet();
}
/**
@@ -122,24 +119,21 @@ public class BatchPackManager {
dispatchProfile.setCallback(callback);
// offer queue
for (ProxyEvent event : packEvent.getEvents()) {
- inCounter.incrementAndGet();
- boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
- // dispatch profile is full
- if (!addResult) {
- outCounter.addAndGet(dispatchProfile.getCount());
- this.dispatchQueue.acquire(dispatchProfile.getSize());
- this.dispatchQueue.offer(dispatchProfile);
+ if (!dispatchProfile.addEvent(event, maxPackCount, maxPackSize)) {
+ // dispatch profile is full
+ this.outCounter.addAndGet(dispatchProfile.getCount());
+ this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
dispatchTime);
dispatchProfile.setCallback(callback);
dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
}
+ this.inCounter.incrementAndGet();
}
// last dispatch profile
if (dispatchProfile.getEvents().size() > 0) {
- outCounter.addAndGet(dispatchProfile.getCount());
- this.dispatchQueue.acquire(dispatchProfile.getSize());
- this.dispatchQueue.offer(dispatchProfile);
+ this.outCounter.addAndGet(dispatchProfile.getCount());
+ this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
}
}
@@ -156,10 +150,9 @@ public class BatchPackManager {
long dispatchTime = msgTime - msgTime % MINUTE_MS;
SimplePackProfile profile = new SimplePackProfile(uid, inlongGroupId, inlongStreamId, dispatchTime);
profile.addEvent(event, maxPackCount, maxPackSize);
- this.dispatchQueue.acquire(profile.getSize());
- this.dispatchQueue.offer(profile);
- outCounter.addAndGet(profile.getCount());
- inCounter.incrementAndGet();
+ this.mqZoneSink.acquireAndOfferDispatchedRecord(profile);
+ this.outCounter.addAndGet(profile.getCount());
+ this.inCounter.incrementAndGet();
}
/**
@@ -171,7 +164,7 @@ public class BatchPackManager {
return;
}
int profileSize = profileCache.size();
- int dispatchSize = dispatchQueue.size();
+ int dispatchSize = this.mqZoneSink.getDispatchQueueSize();
long currentTime = System.currentTimeMillis();
long createThreshold = currentTime - dispatchTimeout;
List<String> removeKeys = new ArrayList<>();
@@ -188,19 +181,18 @@ public class BatchPackManager {
removeKeys.forEach((key) -> {
PackProfile dispatchProfile = this.profileCache.remove(key);
if (dispatchProfile != null) {
- this.dispatchQueue.acquire(dispatchProfile.getSize());
- dispatchQueue.offer(dispatchProfile);
- outCounter.addAndGet(dispatchProfile.getCount());
+ this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
+ this.outCounter.addAndGet(dispatchProfile.getCount());
}
});
long hisInCnt = inCounter.getAndSet(0);
long hisOutCnt = outCounter.getAndSet(0);
if (!removeKeys.isEmpty()) {
- LOG.info("{} output overtime data, profileCacheSize: before={}, after={},"
+ logger.info("{} output overtime data, profileCacheSize: before={}, after={},"
+ " dispatchQueueSize: before={}, after={}, eventCount: {},"
+ " inCounter: {}, outCounter: {}",
- sinkName, profileSize, profileCache.size(), dispatchSize, dispatchQueue.size(),
- eventCount, hisInCnt, hisOutCnt);
+ mqZoneSink.getName(), profileSize, profileCache.size(), dispatchSize,
+ this.mqZoneSink.getDispatchQueueSize(), eventCount, hisInCnt, hisOutCnt);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
index e3b2f480c9..c11d864a3d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class BatchPackProfileCallback {
- private AtomicInteger ackingCount;
- private SourceCallback callback;
+ private final AtomicInteger ackingCount;
+ private final SourceCallback callback;
/**
* Constructor
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
index 46bdabdeee..77cf27b24f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
@@ -31,7 +31,7 @@ import java.util.Set;
*/
public class MessageQueueClusterProducer implements LifecycleAware {
- public static final Logger LOG = LoggerFactory.getLogger(MessageQueueClusterProducer.class);
+ private static final Logger logger = LoggerFactory.getLogger(MessageQueueClusterProducer.class);
private final String workerName;
private final CacheClusterConfig config;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 6ef2576767..be20c21ccf 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class MessageQueueZoneProducer {
- public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
+ private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
private static final long MAX_RESERVED_TIME = 60 * 1000L;
private final MessageQueueZoneSink zoneSink;
private final MessageQueueZoneSinkContext context;
@@ -71,10 +71,10 @@ public class MessageQueueZoneProducer {
*/
public void start() {
try {
- LOG.info("start MessageQueueZoneProducer:{}", zoneSink.getName());
+ logger.info("start MessageQueueZoneProducer:{}", zoneSink.getName());
this.reloadMetaConfig();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
@@ -140,7 +140,7 @@ public class MessageQueueZoneProducer {
tmpProducer.stop();
}
}
- LOG.info("Clear {}'s expired cluster producer {}", zoneSink.getName(), expired);
+ logger.info("{} cleared expired cluster producer {}", zoneSink.getName(), expired);
}
/**
@@ -264,17 +264,17 @@ public class MessageQueueZoneProducer {
return;
}
if (zoneSink.isMqClusterStarted()) {
- LOG.info("Reload {}'s cluster info, current cluster are {}, removed {}, created {}",
+ logger.info("{} reload cluster info, current cluster are {}, removed {}, created {}",
zoneSink.getName(), lastClusterNames, needRmvs, addedItems);
} else {
zoneSink.setMQClusterStarted();
ConfigManager.getInstance().setMqClusterReady();
- LOG.info(
- "Reload {}'s cluster info, and updated sink status, current cluster are {}, removed {}, created {}",
+ logger.info(
+ "{} reload cluster info, and updated sink status, current cluster are {}, removed {}, created {}",
zoneSink.getName(), lastClusterNames, needRmvs, addedItems);
}
} catch (Throwable e) {
- LOG.error("Reload cluster info failure", e);
+ logger.error("{} reload cluster info failure", zoneSink.getName(), e);
}
}
@@ -283,7 +283,7 @@ public class MessageQueueZoneProducer {
if (curTopicSet.isEmpty() || lastRefreshTopics.equals(curTopicSet)) {
return;
}
- LOG.info("Reload {}'s topics changed, current topics are {}, last topics are {}",
+ logger.info("{} reload topics changed, current topics are {}, last topics are {}",
zoneSink.getName(), curTopicSet, lastRefreshTopics);
lastRefreshTopics.addAll(curTopicSet);
for (MessageQueueClusterProducer clusterProducer : this.usingClusterMap.values()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 0a91655ce4..aee7d09c8f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -17,9 +17,10 @@
package org.apache.inlong.dataproxy.sink.mq;
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.sink.common.SinkContext;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -47,7 +48,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class MessageQueueZoneSink extends AbstractSink implements Configurable, ConfigUpdateCallback {
- public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneSink.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000L;
@@ -56,7 +59,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
// message group
private BatchPackManager dispatchManager;
- private BufferQueue<PackProfile> dispatchQueue;
+ private final BufferQueue<PackProfile> dispatchQueue =
+ new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
// scheduled thread pool
// reload
// dispatch
@@ -71,6 +75,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
private volatile boolean isShutdown = false;
// whether mq cluster connected
private volatile boolean mqClusterStarted = false;
+
/**
* configure
*
@@ -78,7 +83,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
*/
@Override
public void configure(Context context) {
- LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+ logger.info("{} start to configure, context:{}.", this.getName(), context.toString());
this.parentContext = context;
}
@@ -87,16 +92,14 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
*/
@Override
public void start() {
+ if (getChannel() == null) {
+ logger.error("{}'s channel is null", this.getName());
+ }
try {
ConfigManager.getInstance().regMetaConfigChgCallback(this);
- // build dispatch queue
- this.dispatchQueue = SinkContext.createBufferQueue();
- this.context = new MessageQueueZoneSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
- if (getChannel() == null) {
- LOG.error(getName() + "'s channel is null");
- }
+ this.context = new MessageQueueZoneSinkContext(this, parentContext, getChannel());
this.context.start();
- this.dispatchManager = new BatchPackManager(getName(), parentContext, dispatchQueue);
+ this.dispatchManager = new BatchPackManager(this, parentContext);
this.scheduledPool = Executors.newScheduledThreadPool(2);
// dispatch
this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
@@ -112,16 +115,18 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
this.zoneProducer.start();
// start configure change listener thread
this.configListener = new Thread(new ConfigChangeProcessor());
- this.configListener.setName(getName() + " configure listener");
+ this.configListener.setName(getName() + "-configure-listener");
this.configListener.start();
// create worker
+ MessageQueueZoneWorker zoneWorker;
for (int i = 0; i < context.getMaxThreads(); i++) {
- MessageQueueZoneWorker worker = new MessageQueueZoneWorker(this.getName(), i, context, zoneProducer);
- worker.start();
- this.workers.add(worker);
+ zoneWorker = new MessageQueueZoneWorker(this, i,
+ context.getProcessInterval(), zoneProducer);
+ zoneWorker.start();
+ this.workers.add(zoneWorker);
}
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ logger.error("{} start failure", this.getName(), e);
}
super.start();
}
@@ -147,7 +152,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
try {
worker.close();
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ logger.error("{} stop Zone worker failure", this.getName(), e);
}
}
this.context.close();
@@ -208,17 +213,53 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
- LOG.error("Process event failed!" + this.getName(), t);
+ if (logCounter.shouldPrint()) {
+ logger.error("{} process event failed!", this.getName(), t);
+ }
try {
tx.rollback();
} catch (Throwable e) {
- LOG.error("Channel take transaction rollback exception:" + getName(), e);
+ if (logCounter.shouldPrint()) {
+ logger.error("{} channel take transaction rollback exception", this.getName(), e);
+ }
}
return Status.BACKOFF;
} finally {
tx.close();
}
}
+ public boolean isMqClusterStarted() {
+ return mqClusterStarted;
+ }
+
+ public void setMQClusterStarted() {
+ this.mqClusterStarted = true;
+ }
+
+ public void acquireAndOfferDispatchedRecord(PackProfile record) {
+ this.dispatchQueue.acquire(record.getSize());
+ this.dispatchQueue.offer(record);
+ }
+
+ public void offerDispatchRecord(PackProfile record) {
+ this.dispatchQueue.offer(record);
+ }
+
+ public PackProfile pollDispatchedRecord() {
+ return this.dispatchQueue.pollRecord();
+ }
+
+ public void releaseAcquiredSizePermit(PackProfile record) {
+ this.dispatchQueue.release(record.getSize());
+ }
+
+ public int getDispatchQueueSize() {
+ return this.dispatchQueue.size();
+ }
+
+ public int getDispatchAvailablePermits() {
+ return this.dispatchQueue.availablePermits();
+ }
@Override
public void update() {
@@ -229,14 +270,13 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
syncLock.notifyAll();
}
- public boolean isMqClusterStarted() {
- return mqClusterStarted;
- }
-
- public void setMQClusterStarted() {
- this.mqClusterStarted = true;
- }
-
+ /**
+ * ConfigChangeProcessor
+ *
+ * Metadata configuration change listener class, when the metadata change notification
+ * arrives, check and change the mapping relationship between the mq cluster information
+ * and the configured inlongid to Topic,
+ */
private class ConfigChangeProcessor implements Runnable {
@Override
@@ -246,7 +286,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
try {
syncLock.wait();
} catch (InterruptedException e) {
- LOG.error("{} config-change processor meet interrupt, exit!", getName());
+ logger.error("{} config-change processor meet interrupt, exit!", getName());
break;
} catch (Throwable e2) {
//
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 9b60cffa3d..becfade7bd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -26,7 +26,6 @@ import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.SinkContext;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import org.apache.commons.lang.ClassUtils;
@@ -49,8 +48,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
public static final String PREFIX_PRODUCER = "producer.";
public static final String KEY_COMPRESS_TYPE = "compressType";
- private final BufferQueue<PackProfile> dispatchQueue;
-
+ private final MessageQueueZoneSink mqZoneSink;
private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
@@ -60,10 +58,9 @@ public class MessageQueueZoneSinkContext extends SinkContext {
/**
* Constructor
*/
- public MessageQueueZoneSinkContext(String sinkName, Context context, Channel channel,
- BufferQueue<PackProfile> dispatchQueue) {
- super(sinkName, context, channel);
- this.dispatchQueue = dispatchQueue;
+ public MessageQueueZoneSinkContext(MessageQueueZoneSink mqZoneSink, Context context, Channel channel) {
+ super(mqZoneSink.getName(), context, channel);
+ this.mqZoneSink = mqZoneSink;
// proxyClusterId
this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
// nodeId
@@ -100,12 +97,12 @@ public class MessageQueueZoneSinkContext extends SinkContext {
}
/**
- * get dispatchQueue
- *
- * @return the dispatchQueue
+ * get message queue zone sink
+ *
+ * @return the zone sink
*/
- public BufferQueue<PackProfile> getDispatchQueue() {
- return dispatchQueue;
+ public MessageQueueZoneSink getMqZoneSink() {
+ return mqZoneSink;
}
/**
@@ -245,12 +242,13 @@ public class MessageQueueZoneSinkContext extends SinkContext {
String mqName, String topic, long sendTime,
DataProxyErrCode errCode, String errMsg) {
if (currentRecord.isResend()) {
- dispatchQueue.offer(currentRecord);
+ this.mqZoneSink.offerDispatchRecord(currentRecord);
fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILRETRY);
this.addSendResultMetric(currentRecord, mqName, topic, false, sendTime);
} else {
- currentRecord.fail(errCode, errMsg);
+ this.mqZoneSink.releaseAcquiredSizePermit(currentRecord);
fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILDROPPED);
+ currentRecord.fail(errCode, errMsg);
}
}
@@ -267,11 +265,10 @@ public class MessageQueueZoneSinkContext extends SinkContext {
configurable.configure(new Context(CommonConfigHolder.getInstance().getProperties()));
}
if (selectorObject instanceof CacheClusterSelector) {
- CacheClusterSelector selector = (CacheClusterSelector) selectorObject;
- return selector;
+ return (CacheClusterSelector) selectorObject;
}
} catch (Throwable t) {
- LOG.error("Fail to init CacheClusterSelector,selectorClass:{},error:{}",
+ logger.error("Fail to init CacheClusterSelector,selectorClass:{},error:{}",
strSelectorClass, t.getMessage(), t);
}
return null;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index 8a2e4c5abe..ad61167295 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -17,6 +17,8 @@
package org.apache.inlong.dataproxy.sink.mq;
+import org.apache.inlong.common.monitor.LogCounter;
+
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,22 +28,24 @@ import org.slf4j.LoggerFactory;
*/
public class MessageQueueZoneWorker extends Thread {
- public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneWorker.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneWorker.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
private final String workerName;
- private final MessageQueueZoneSinkContext context;
-
- private MessageQueueZoneProducer zoneProducer;
+ private final long fetchWaitMs;
+ private final MessageQueueZoneSink mqZoneSink;
+ private final MessageQueueZoneProducer zoneProducer;
private LifecycleState status;
/**
* Constructor
*/
- public MessageQueueZoneWorker(String sinkName, int workerIndex, MessageQueueZoneSinkContext context,
- MessageQueueZoneProducer zoneProducer) {
+ public MessageQueueZoneWorker(MessageQueueZoneSink mqZoneSink, int workerIndex,
+ long fetchWaitMs, MessageQueueZoneProducer zoneProducer) {
super();
- this.workerName = sinkName + "-worker-" + workerIndex;
- this.context = context;
+ this.mqZoneSink = mqZoneSink;
+ this.workerName = mqZoneSink.getName() + "-worker-" + workerIndex;
+ this.fetchWaitMs = fetchWaitMs;
this.zoneProducer = zoneProducer;
this.status = LifecycleState.IDLE;
}
@@ -70,25 +74,28 @@ public class MessageQueueZoneWorker extends Thread {
*/
@Override
public void run() {
- LOG.info(String.format("start MessageQueueZoneWorker:%s", this.workerName));
+ logger.info("{} start message zone worker", this.workerName);
+ PackProfile profile = null;
while (status != LifecycleState.STOP) {
- PackProfile profile = null;
try {
- profile = context.getDispatchQueue().pollRecord();
+ profile = this.mqZoneSink.pollDispatchedRecord();
if (profile == null) {
this.sleepOneInterval();
continue;
}
// send
this.zoneProducer.send(profile);
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ } catch (Throwable e1) {
if (profile != null) {
- context.getDispatchQueue().offer(profile);
+ this.mqZoneSink.offerDispatchRecord(profile);
+ }
+ if (logCounter.shouldPrint()) {
+ logger.error("{} send message failure", workerName, e1);
}
this.sleepOneInterval();
}
}
+ logger.info("{} exit message zone worker", this.workerName);
}
/**
@@ -96,9 +103,11 @@ public class MessageQueueZoneWorker extends Thread {
*/
private void sleepOneInterval() {
try {
- Thread.sleep(context.getProcessInterval());
+ Thread.sleep(fetchWaitMs);
} catch (InterruptedException e1) {
- LOG.error(e1.getMessage(), e1);
+ if (logCounter.shouldPrint()) {
+ logger.error("{} wait poll record interrupted", workerName, e1);
+ }
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 1867e6a8ce..87ebe736c8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -33,8 +33,8 @@ public abstract class PackProfile {
private final long dispatchTime;
private final long createTime = System.currentTimeMillis();
private final String uid;
- protected long count = 0;
- protected long size = 0;
+ protected int count = 0;
+ protected int size = 0;
protected final boolean enableRetryAfterFailure;
protected final int maxRetries;
protected int retries = 0;
@@ -96,7 +96,7 @@ public abstract class PackProfile {
*
* @return the count
*/
- public long getCount() {
+ public int getCount() {
return count;
}
@@ -105,7 +105,7 @@ public abstract class PackProfile {
*
* @param count the count to set
*/
- public void setCount(long count) {
+ public void setCount(int count) {
this.count = count;
}
@@ -114,7 +114,7 @@ public abstract class PackProfile {
*
* @return the size
*/
- public long getSize() {
+ public int getSize() {
return size;
}
@@ -123,7 +123,7 @@ public abstract class PackProfile {
*
* @param size the size to set
*/
- public void setSize(long size) {
+ public void setSize(int size) {
this.size = size;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
index 662cadb173..fdf23d49de 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
@@ -44,7 +44,7 @@ public class RandomCacheClusterSelector implements CacheClusterSelector, Configu
/**
* select
- * @param allClusterList
+ * @param allConfigList
* @return
*/
@Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index c816feb14e..5949d38e4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -25,7 +25,6 @@ import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source2.InLongMessageHandler;
-import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.InlongId;
@@ -46,10 +45,9 @@ import java.util.Map;
*/
public class SimplePackProfile extends PackProfile {
- // log print count
- private static final LogCounter logCounter =
- new LogCounter(10, 100000, 30 * 1000);
private static final Logger logger = LoggerFactory.getLogger(SimplePackProfile.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
private static final long MINUTE_MS = 60L * 1000;
private boolean needRspEvent = false;
private Channel channel;
@@ -154,9 +152,10 @@ public class SimplePackProfile extends PackProfile {
*/
public Map<String, String> getPropsToMQ() {
Map<String, String> result = new HashMap<>();
- result.put(Constants.HEADER_KEY_SOURCE_TIME, event.getHeaders().get(AttributeConstants.RCV_TIME));
+ result.put(AttributeConstants.RCV_TIME, event.getHeaders().get(AttributeConstants.RCV_TIME));
result.put(ConfigConstants.MSG_ENCODE_VER, event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER));
result.put(EventConstants.HEADER_KEY_VERSION, event.getHeaders().get(EventConstants.HEADER_KEY_VERSION));
+ result.put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY));
result.put(ConfigConstants.DATAPROXY_IP_KEY, NetworkUtils.getLocalIp());
return result;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index e91f41f4b4..126ca385dc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -52,7 +52,7 @@ import java.util.Set;
*/
public class KafkaHandler implements MessageQueueHandler {
- public static final Logger LOG = LoggerFactory.getLogger(KafkaHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
@@ -88,11 +88,11 @@ public class KafkaHandler implements MessageQueueHandler {
Context context = this.sinkContext.getProducerContext();
props.putAll(context.getParameters());
props.putAll(config.getParams());
- LOG.info("try to create kafka client:{}", props);
+ logger.info("try to create kafka client:{}", props);
producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
- LOG.info("create new producer success:{}", producer);
+ logger.info("create new producer success:{}", producer);
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
@@ -108,7 +108,7 @@ public class KafkaHandler implements MessageQueueHandler {
public void stop() {
// kafka producer
this.producer.close();
- LOG.info("kafka handler stopped");
+ logger.info("kafka handler stopped");
}
/**
@@ -128,7 +128,7 @@ public class KafkaHandler implements MessageQueueHandler {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -136,7 +136,7 @@ public class KafkaHandler implements MessageQueueHandler {
if (StringUtils.isEmpty(topic)) {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -162,7 +162,7 @@ public class KafkaHandler implements MessageQueueHandler {
sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send Message to Kafka failure", ex);
+ logger.error("Send Message to Kafka failure", ex);
}
return false;
}
@@ -205,12 +205,12 @@ public class KafkaHandler implements MessageQueueHandler {
sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send BatchPackProfile to Kafka failure", ex);
+ logger.error("Send BatchPackProfile to Kafka failure", ex);
}
} else {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName, topic, true, sendTime);
- sinkContext.getDispatchQueue().release(batchProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
batchProfile.ack();
}
}
@@ -251,14 +251,14 @@ public class KafkaHandler implements MessageQueueHandler {
sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send SimplePackProfile to Kafka failure", ex);
+ logger.error("Send SimplePackProfile to Kafka failure", ex);
}
} else {
sinkContext.fileMetricAddSuccCnt(simpleProfile, topic,
arg0 == null ? "" : String.valueOf(arg0.partition()));
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName, topic, true, sendTime);
- sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
simpleProfile.ack();
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 3933045d74..66f8067014 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -63,7 +63,7 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTER
*/
public class PulsarHandler implements MessageQueueHandler {
- public static final Logger LOG = LoggerFactory.getLogger(PulsarHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(PulsarHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
@@ -166,9 +166,9 @@ public class PulsarHandler implements MessageQueueHandler {
.enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
.compressionType(this.getPulsarCompressionType());
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
- LOG.info("pulsar handler started");
+ logger.info("pulsar handler started");
}
/**
@@ -180,15 +180,15 @@ public class PulsarHandler implements MessageQueueHandler {
try {
entry.getValue().close();
} catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
try {
this.client.close();
} catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
- LOG.info("pulsar handler stopped");
+ logger.info("pulsar handler stopped");
}
@Override
@@ -213,7 +213,7 @@ public class PulsarHandler implements MessageQueueHandler {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -221,7 +221,7 @@ public class PulsarHandler implements MessageQueueHandler {
if (StringUtils.isEmpty(producerTopic)) {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -233,22 +233,22 @@ public class PulsarHandler implements MessageQueueHandler {
Producer<byte[]> producer = this.producerMap.get(producerTopic);
if (producer == null) {
try {
- LOG.info("try to new a object for topic " + producerTopic);
+ logger.info("try to new a object for topic " + producerTopic);
SecureRandom secureRandom = new SecureRandom(
(producerTopic + System.currentTimeMillis()).getBytes());
String producerName = producerTopic + "-" + secureRandom.nextLong();
producer = baseBuilder.clone().topic(producerTopic)
.producerName(producerName)
.create();
- LOG.info("create new producer success:{}", producer.getProducerName());
+ logger.info("create new producer success:{}", producer.getProducerName());
Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
if (oldProducer != null) {
producer.close();
- LOG.info("close producer success:{}", producer.getProducerName());
+ logger.info("close producer success:{}", producer.getProducerName());
producer = oldProducer;
}
} catch (Throwable ex) {
- LOG.error("create new producer failed", ex);
+ logger.error("create new producer failed", ex);
}
}
// create producer failed
@@ -270,7 +270,7 @@ public class PulsarHandler implements MessageQueueHandler {
sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send Message to Pulsar failure", ex);
+ logger.error("Send Message to Pulsar failure", ex);
}
return false;
}
@@ -327,12 +327,12 @@ public class PulsarHandler implements MessageQueueHandler {
sinkContext.processSendFail(batchProfile, clusterName, producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send ProfileV1 to Pulsar failure", ex);
+ logger.error("Send ProfileV1 to Pulsar failure", ex);
}
} else {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName, producerTopic, true, sendTime);
- sinkContext.getDispatchQueue().release(batchProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
batchProfile.ack();
}
});
@@ -362,13 +362,13 @@ public class PulsarHandler implements MessageQueueHandler {
sinkContext.processSendFail(simpleProfile, clusterName, producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+ logger.error("Send SimpleProfileV0 to Pulsar failure", ex);
}
} else {
sinkContext.fileMetricAddSuccCnt(simpleProfile, producerTopic, msgId.toString());
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName, producerTopic, true, sendTime);
- sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
simpleProfile.ack();
}
});
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index e8054f8863..dcd28d13c3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -55,7 +55,7 @@ import java.util.Set;
*/
public class TubeHandler implements MessageQueueHandler {
- public static final Logger LOG = LoggerFactory.getLogger(TubeHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(TubeHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
@@ -98,12 +98,12 @@ public class TubeHandler implements MessageQueueHandler {
try {
// prepare configuration
TubeClientConfig conf = initTubeConfig();
- LOG.info("try to create producer:{}", conf.toJsonString());
+ logger.info("try to create producer:{}", conf.toJsonString());
this.sessionFactory = new TubeMultiSessionFactory(conf);
this.producer = sessionFactory.createProducer();
- LOG.info("create new producer success:{}", producer);
+ logger.info("create new producer success:{}", producer);
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
@@ -116,10 +116,10 @@ public class TubeHandler implements MessageQueueHandler {
try {
published = producer.publish(newTopicSet);
this.topicSet.addAll(newTopicSet);
- LOG.info("Publish topics to {}, need publish are {}, published are {}",
+ logger.info("Publish topics to {}, need publish are {}, published are {}",
this.clusterName, newTopicSet, published);
} catch (Throwable e) {
- LOG.warn("Publish topics to {} failure", this.clusterName, e);
+ logger.warn("Publish topics to {} failure", this.clusterName, e);
}
}
@@ -166,17 +166,17 @@ public class TubeHandler implements MessageQueueHandler {
try {
this.producer.shutdown();
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
if (this.sessionFactory != null) {
try {
this.sessionFactory.shutdown();
} catch (TubeClientException e) {
- LOG.error(e.getMessage(), e);
+ logger.error(e.getMessage(), e);
}
}
- LOG.info("tube handler stopped");
+ logger.info("tube handler stopped");
}
/**
@@ -193,7 +193,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -201,7 +201,7 @@ public class TubeHandler implements MessageQueueHandler {
if (StringUtils.isEmpty(topic)) {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
@@ -233,7 +233,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send Message to Tube failure", ex);
+ logger.error("Send Message to Tube failure", ex);
}
return false;
}
@@ -270,14 +270,14 @@ public class TubeHandler implements MessageQueueHandler {
if (result.isSuccess()) {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName, topic, true, sendTime);
- sinkContext.getDispatchQueue().release(batchProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
batchProfile.ack();
} else {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, result.getErrMsg());
if (logCounter.shouldPrint()) {
- LOG.error("Send ProfileV1 to tube failure {}", result.getErrMsg());
+ logger.error("Send ProfileV1 to tube failure {}", result.getErrMsg());
}
}
}
@@ -288,7 +288,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send ProfileV1 to tube exception", ex);
+ logger.error("Send ProfileV1 to tube exception", ex);
}
}
};
@@ -319,7 +319,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.fileMetricAddSuccCnt(simpleProfile, topic, result.getPartition().getHost());
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName, topic, true, sendTime);
- sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+ sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
simpleProfile.ack();
} else {
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
@@ -328,7 +328,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, result.getErrMsg());
if (logCounter.shouldPrint()) {
- LOG.error("Send SimpleProfileV0 to tube failure: {}", result.getErrMsg());
+ logger.error("Send SimpleProfileV0 to tube failure: {}", result.getErrMsg());
}
}
}
@@ -340,7 +340,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send SimpleProfileV0 to tube exception", ex);
+ logger.error("Send SimpleProfileV0 to tube exception", ex);
}
}
};
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
index 1acb241a6d..c11cffa63e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.source2.v1msg;
+import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
@@ -37,8 +38,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class InlongTcpSourceCallback implements SourceCallback {
- public static final Logger LOG = LoggerFactory.getLogger(InlongTcpSourceCallback.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(InlongTcpSourceCallback.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
private final ChannelHandlerContext ctx;
private final MessagePackHeader header;
private final CountDownLatch latch;
@@ -47,8 +49,8 @@ public class InlongTcpSourceCallback implements SourceCallback {
/**
* Constructor
*
- * @param ctx
- * @param header
+ * @param ctx the channel context
+ * @param header the message pack header
*/
public InlongTcpSourceCallback(ChannelHandlerContext ctx, MessagePackHeader header) {
this.ctx = ctx;
@@ -59,7 +61,7 @@ public class InlongTcpSourceCallback implements SourceCallback {
/**
* callback
*
- * @param resultCode
+ * @param resultCode the result code
*/
@Override
public void callback(ResultCode resultCode) {
@@ -82,13 +84,17 @@ public class InlongTcpSourceCallback implements SourceCallback {
if (remoteChannel.isWritable()) {
remoteChannel.write(buffer);
} else {
- LOG.warn("the send buffer2 is full, so disconnect it!"
- + "please check remote client; Connection info:{}",
- remoteChannel);
+ if (logCounter.shouldPrint()) {
+ logger.warn("the send buffer2 is full, so disconnect it!"
+ + " please check remote client; Connection info:{}",
+ remoteChannel);
+ }
buffer.release();
}
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ } catch (Throwable e) {
+ if (logCounter.shouldPrint()) {
+ logger.error("Send response failure", e);
+ }
} finally {
// notice TCP session
this.latch.countDown();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
index 33e51485a6..15eea923f5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -27,14 +27,14 @@ public class BufferQueue<A> {
private final LinkedBlockingQueue<A> queue;
private final SizeSemaphore currentTokens;
- private SizeSemaphore globalTokens;
+ private SizeSemaphore globalTokens = null;
private final AtomicLong offerCount = new AtomicLong(0);
private final AtomicLong pollCount = new AtomicLong(0);
/**
* Constructor
*
- * @param maxSizeKb
+ * @param maxSizeKb the initial size of permits to acquire
*/
public BufferQueue(int maxSizeKb) {
this.queue = new LinkedBlockingQueue<>();
@@ -44,8 +44,8 @@ public class BufferQueue<A> {
/**
* Constructor
*
- * @param maxSizeKb
- * @param globalTokens
+ * @param maxSizeKb the initial size of permits to acquire
+ * @param globalTokens the global permit semaphore
*/
public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
this(maxSizeKb);
@@ -109,42 +109,49 @@ public class BufferQueue<A> {
}
/**
- * tryAcquire
+ * try to acquire required size permit
+ *
+ * @param sizeInByte the size of permits to acquire
+ * @return true if the permits were acquired and false otherwise
*/
public boolean tryAcquire(long sizeInByte) {
- boolean cidResult = currentTokens.tryAcquire(sizeInByte);
- if (!cidResult) {
- return false;
- }
if (this.globalTokens == null) {
- return true;
- }
- boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
- if (globalResult) {
- return true;
+ return currentTokens.tryAcquire(sizeInByte);
+ } else {
+ if (!this.globalTokens.tryAcquire(sizeInByte)) {
+ return false;
+ }
+ if (currentTokens.tryAcquire(sizeInByte)) {
+ return true;
+ } else {
+ this.globalTokens.release(sizeInByte);
+ return false;
+ }
}
- currentTokens.release(sizeInByte);
- return false;
}
/**
- * acquire
+ * acquire size permit
+ *
+ * @param sizeInByte the size of permits to acquire
*/
public void acquire(long sizeInByte) {
- currentTokens.acquire(sizeInByte);
if (this.globalTokens != null) {
globalTokens.acquire(sizeInByte);
}
+ currentTokens.acquire(sizeInByte);
}
/**
- * release
+ * release size permit
+ *
+ * @param sizeInByte the size of permits to release
*/
public void release(long sizeInByte) {
+ this.currentTokens.release(sizeInByte);
if (this.globalTokens != null) {
this.globalTokens.release(sizeInByte);
}
- this.currentTokens.release(sizeInByte);
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
index 99b1b35e86..adb4881b68 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
@@ -30,7 +30,7 @@ public class SizeSemaphore {
private int maxSize = 0;
private int leftSize = 0;
private Semaphore sizeSemaphore = null;
- private AtomicInteger leftSemaphore = new AtomicInteger(0);
+ private final AtomicInteger leftSemaphore = new AtomicInteger(0);
/**
* Constructor