You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/06/21 02:25:30 UTC

[inlong] branch master updated: [INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0936f81cb7 [INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)
0936f81cb7 is described below

commit 0936f81cb73d85bba7fbc081b9462e0916eb2cb1
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Jun 21 10:25:24 2023 +0800

    [INLONG-8294][DataProxy] Optimize the log output in the Sink module (#8295)
---
 .../inlong/dataproxy/sink/common/SinkContext.java  | 68 +++------------
 .../inlong/dataproxy/sink/mq/BatchPackManager.java | 68 +++++++--------
 .../sink/mq/BatchPackProfileCallback.java          |  4 +-
 .../sink/mq/MessageQueueClusterProducer.java       |  2 +-
 .../sink/mq/MessageQueueZoneProducer.java          | 18 ++--
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    | 96 +++++++++++++++-------
 .../sink/mq/MessageQueueZoneSinkContext.java       | 31 ++++---
 .../dataproxy/sink/mq/MessageQueueZoneWorker.java  | 43 ++++++----
 .../inlong/dataproxy/sink/mq/PackProfile.java      | 12 +--
 .../sink/mq/RandomCacheClusterSelector.java        |  2 +-
 .../dataproxy/sink/mq/SimplePackProfile.java       |  9 +-
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      | 24 +++---
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    | 34 ++++----
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 36 ++++----
 .../source2/v1msg/InlongTcpSourceCallback.java     | 26 +++---
 .../apache/inlong/dataproxy/utils/BufferQueue.java | 47 ++++++-----
 .../inlong/dataproxy/utils/SizeSemaphore.java      |  2 +-
 17 files changed, 262 insertions(+), 260 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 102c88a37d..9ccdb82a05 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -25,9 +25,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
 import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
-import org.apache.inlong.dataproxy.sink.mq.PackProfile;
 import org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
 
 import org.apache.commons.lang.ClassUtils;
 import org.apache.flume.Channel;
@@ -35,22 +33,18 @@ import org.apache.flume.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
 /**
  * SinkContext
  */
 public class SinkContext {
 
-    public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class);
-
     public static final String KEY_MAX_THREADS = "maxThreads";
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
     public static final String KEY_MESSAGE_QUEUE_HANDLER = "messageQueueHandler";
 
+    protected static final Logger logger = LoggerFactory.getLogger(SinkContext.class);
+
     protected final String clusterId;
     protected final String sinkName;
     protected final Context sinkContext;
@@ -62,7 +56,6 @@ public class SinkContext {
     protected final long reloadInterval;
     //
     protected final DataProxyMetricItemSet metricItemSet;
-    protected Timer reloadTimer;
     // file metric statistic
     protected MonitorIndex monitorIndex = null;
     private MonitorStats monitorStats = null;
@@ -100,23 +93,12 @@ public class SinkContext {
             this.monitorIndex.start();
             this.monitorStats.start();
         }
-        try {
-            this.reload();
-            this.setReloadTimer();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
     }
 
     /**
      * close
      */
     public void close() {
-        try {
-            this.reloadTimer.cancel();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
         // stop file statistic index
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             if (monitorIndex != null) {
@@ -153,26 +135,6 @@ public class SinkContext {
         }
     }
 
-    /**
-     * setReloadTimer
-     */
-    protected void setReloadTimer() {
-        reloadTimer = new Timer(true);
-        TimerTask task = new TimerTask() {
-
-            public void run() {
-                reload();
-            }
-        };
-        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-    }
-
     /**
      * get clusterId
      * 
@@ -202,7 +164,7 @@ public class SinkContext {
 
     /**
      * get channel
-     * 
+     *
      * @return the channel
      */
     public Channel getChannel() {
@@ -255,12 +217,11 @@ public class SinkContext {
             Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
             Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
             if (handlerObject instanceof EventHandler) {
-                EventHandler handler = (EventHandler) handlerObject;
-                return handler;
+                return (EventHandler) handlerObject;
             }
         } catch (Throwable t) {
-            LOG.error("Fail to init EventHandler,handlerClass:{},error:{}",
-                    eventHandlerClass, t.getMessage(), t);
+            logger.error("{} fail to init EventHandler,handlerClass:{},error:{}",
+                    this.sinkName, eventHandlerClass, t.getMessage(), t);
         }
         return null;
     }
@@ -271,26 +232,17 @@ public class SinkContext {
     public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig config) {
         String strHandlerClass = config.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER,
                 PulsarHandler.class.getName());
-        LOG.info("mq handler class = {}", strHandlerClass);
+        logger.info("{}'s mq handler class = {}", this.sinkName, strHandlerClass);
         try {
             Class<?> handlerClass = ClassUtils.getClass(strHandlerClass);
             Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
             if (handlerObject instanceof MessageQueueHandler) {
-                MessageQueueHandler handler = (MessageQueueHandler) handlerObject;
-                return handler;
+                return (MessageQueueHandler) handlerObject;
             }
         } catch (Throwable t) {
-            LOG.error("Fail to init MessageQueueHandler,handlerClass:{},error:{}",
-                    strHandlerClass, t.getMessage(), t);
+            logger.error("{} fail to init MessageQueueHandler,handlerClass:{},error:{}",
+                    this.sinkName, strHandlerClass, t.getMessage(), t);
         }
         return null;
     }
-
-    /**
-     * createBufferQueue
-     * @return
-     */
-    public static BufferQueue<PackProfile> createBufferQueue() {
-        return new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
-    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index f1b4f54473..db671d1ad9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -42,7 +41,8 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class BatchPackManager {
 
-    public static final Logger LOG = LoggerFactory.getLogger(BatchPackManager.class);
+    private static final Logger logger = LoggerFactory.getLogger(BatchPackManager.class);
+
     public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
     public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
     public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
@@ -54,8 +54,7 @@ public class BatchPackManager {
     private final long dispatchTimeout;
     private final long maxPackCount;
     private final long maxPackSize;
-    private final String sinkName;
-    private final BufferQueue<PackProfile> dispatchQueue;
+    private final MessageQueueZoneSink mqZoneSink;
     private final ConcurrentHashMap<String, PackProfile> profileCache = new ConcurrentHashMap<>();
     // flag that manager need to output overtime data.
     private final AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
@@ -65,13 +64,11 @@ public class BatchPackManager {
     /**
      * Constructor
      *
-     * @param sinkName the sink name
+     * @param mqZoneSink the mq zone sink
      * @param context the process context
-     * @param dispatchQueue  the batch queue
      */
-    public BatchPackManager(String sinkName, Context context, BufferQueue<PackProfile> dispatchQueue) {
-        this.sinkName = sinkName;
-        this.dispatchQueue = dispatchQueue;
+    public BatchPackManager(MessageQueueZoneSink mqZoneSink, Context context) {
+        this.mqZoneSink = mqZoneSink;
         this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, DEFAULT_DISPATCH_TIMEOUT);
         this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, DEFAULT_DISPATCH_MAX_PACKCOUNT);
         this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, DEFAULT_DISPATCH_MAX_PACKSIZE);
@@ -89,22 +86,22 @@ public class BatchPackManager {
         // find dispatch profile
         PackProfile dispatchProfile = this.profileCache.get(dispatchKey);
         if (dispatchProfile == null) {
-            dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
-                    dispatchTime);
+            dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(),
+                    event.getInlongStreamId(), dispatchTime);
             this.profileCache.put(dispatchKey, dispatchProfile);
         }
         // add event
-        boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
-        if (!addResult) {
+        if (!dispatchProfile.addEvent(event, maxPackCount, maxPackSize)) {
             BatchPackProfile newDispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(),
                     event.getInlongStreamId(), dispatchTime);
             PackProfile oldDispatchProfile = this.profileCache.put(dispatchKey, newDispatchProfile);
-            this.dispatchQueue.acquire(oldDispatchProfile.getSize());
-            this.dispatchQueue.offer(oldDispatchProfile);
-            outCounter.addAndGet(dispatchProfile.getCount());
+            if (oldDispatchProfile != null) {
+                this.mqZoneSink.acquireAndOfferDispatchedRecord(oldDispatchProfile);
+            }
+            this.outCounter.addAndGet(dispatchProfile.getCount());
             newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
         }
-        inCounter.incrementAndGet();
+        this.inCounter.incrementAndGet();
     }
 
     /**
@@ -122,24 +119,21 @@ public class BatchPackManager {
         dispatchProfile.setCallback(callback);
         // offer queue
         for (ProxyEvent event : packEvent.getEvents()) {
-            inCounter.incrementAndGet();
-            boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
-            // dispatch profile is full
-            if (!addResult) {
-                outCounter.addAndGet(dispatchProfile.getCount());
-                this.dispatchQueue.acquire(dispatchProfile.getSize());
-                this.dispatchQueue.offer(dispatchProfile);
+            if (!dispatchProfile.addEvent(event, maxPackCount, maxPackSize)) {
+                // dispatch profile is full
+                this.outCounter.addAndGet(dispatchProfile.getCount());
+                this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
                 dispatchProfile = new BatchPackProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
                         dispatchTime);
                 dispatchProfile.setCallback(callback);
                 dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
             }
+            this.inCounter.incrementAndGet();
         }
         // last dispatch profile
         if (dispatchProfile.getEvents().size() > 0) {
-            outCounter.addAndGet(dispatchProfile.getCount());
-            this.dispatchQueue.acquire(dispatchProfile.getSize());
-            this.dispatchQueue.offer(dispatchProfile);
+            this.outCounter.addAndGet(dispatchProfile.getCount());
+            this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
         }
     }
 
@@ -156,10 +150,9 @@ public class BatchPackManager {
         long dispatchTime = msgTime - msgTime % MINUTE_MS;
         SimplePackProfile profile = new SimplePackProfile(uid, inlongGroupId, inlongStreamId, dispatchTime);
         profile.addEvent(event, maxPackCount, maxPackSize);
-        this.dispatchQueue.acquire(profile.getSize());
-        this.dispatchQueue.offer(profile);
-        outCounter.addAndGet(profile.getCount());
-        inCounter.incrementAndGet();
+        this.mqZoneSink.acquireAndOfferDispatchedRecord(profile);
+        this.outCounter.addAndGet(profile.getCount());
+        this.inCounter.incrementAndGet();
     }
 
     /**
@@ -171,7 +164,7 @@ public class BatchPackManager {
             return;
         }
         int profileSize = profileCache.size();
-        int dispatchSize = dispatchQueue.size();
+        int dispatchSize = this.mqZoneSink.getDispatchQueueSize();
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
         List<String> removeKeys = new ArrayList<>();
@@ -188,19 +181,18 @@ public class BatchPackManager {
         removeKeys.forEach((key) -> {
             PackProfile dispatchProfile = this.profileCache.remove(key);
             if (dispatchProfile != null) {
-                this.dispatchQueue.acquire(dispatchProfile.getSize());
-                dispatchQueue.offer(dispatchProfile);
-                outCounter.addAndGet(dispatchProfile.getCount());
+                this.mqZoneSink.acquireAndOfferDispatchedRecord(dispatchProfile);
+                this.outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
         long hisInCnt = inCounter.getAndSet(0);
         long hisOutCnt = outCounter.getAndSet(0);
         if (!removeKeys.isEmpty()) {
-            LOG.info("{} output overtime data, profileCacheSize: before={}, after={},"
+            logger.info("{} output overtime data, profileCacheSize: before={}, after={},"
                     + " dispatchQueueSize: before={}, after={}, eventCount: {},"
                     + " inCounter: {}, outCounter: {}",
-                    sinkName, profileSize, profileCache.size(), dispatchSize, dispatchQueue.size(),
-                    eventCount, hisInCnt, hisOutCnt);
+                    mqZoneSink.getName(), profileSize, profileCache.size(), dispatchSize,
+                    this.mqZoneSink.getDispatchQueueSize(), eventCount, hisInCnt, hisOutCnt);
         }
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
index e3b2f480c9..c11d864a3d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfileCallback.java
@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class BatchPackProfileCallback {
 
-    private AtomicInteger ackingCount;
-    private SourceCallback callback;
+    private final AtomicInteger ackingCount;
+    private final SourceCallback callback;
 
     /**
      * Constructor
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
index 46bdabdeee..77cf27b24f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueClusterProducer.java
@@ -31,7 +31,7 @@ import java.util.Set;
  */
 public class MessageQueueClusterProducer implements LifecycleAware {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueClusterProducer.class);
+    private static final Logger logger = LoggerFactory.getLogger(MessageQueueClusterProducer.class);
 
     private final String workerName;
     private final CacheClusterConfig config;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 6ef2576767..be20c21ccf 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class MessageQueueZoneProducer {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
+    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
     private static final long MAX_RESERVED_TIME = 60 * 1000L;
     private final MessageQueueZoneSink zoneSink;
     private final MessageQueueZoneSinkContext context;
@@ -71,10 +71,10 @@ public class MessageQueueZoneProducer {
      */
     public void start() {
         try {
-            LOG.info("start MessageQueueZoneProducer:{}", zoneSink.getName());
+            logger.info("start MessageQueueZoneProducer:{}", zoneSink.getName());
             this.reloadMetaConfig();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -140,7 +140,7 @@ public class MessageQueueZoneProducer {
                 tmpProducer.stop();
             }
         }
-        LOG.info("Clear {}'s expired cluster producer {}", zoneSink.getName(), expired);
+        logger.info("{} cleared expired cluster producer {}", zoneSink.getName(), expired);
     }
 
     /**
@@ -264,17 +264,17 @@ public class MessageQueueZoneProducer {
                 return;
             }
             if (zoneSink.isMqClusterStarted()) {
-                LOG.info("Reload {}'s cluster info, current cluster are {}, removed {}, created {}",
+                logger.info("{} reload cluster info, current cluster are {}, removed {}, created {}",
                         zoneSink.getName(), lastClusterNames, needRmvs, addedItems);
             } else {
                 zoneSink.setMQClusterStarted();
                 ConfigManager.getInstance().setMqClusterReady();
-                LOG.info(
-                        "Reload {}'s cluster info, and updated sink status, current cluster are {}, removed {}, created {}",
+                logger.info(
+                        "{} reload cluster info, and updated sink status, current cluster are {}, removed {}, created {}",
                         zoneSink.getName(), lastClusterNames, needRmvs, addedItems);
             }
         } catch (Throwable e) {
-            LOG.error("Reload cluster info failure", e);
+            logger.error("{} reload cluster info failure", zoneSink.getName(), e);
         }
     }
 
@@ -283,7 +283,7 @@ public class MessageQueueZoneProducer {
         if (curTopicSet.isEmpty() || lastRefreshTopics.equals(curTopicSet)) {
             return;
         }
-        LOG.info("Reload {}'s topics changed, current topics are {}, last topics are {}",
+        logger.info("{} reload topics changed, current topics are {}, last topics are {}",
                 zoneSink.getName(), curTopicSet, lastRefreshTopics);
         lastRefreshTopics.addAll(curTopicSet);
         for (MessageQueueClusterProducer clusterProducer : this.usingClusterMap.values()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 0a91655ce4..aee7d09c8f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.dataproxy.sink.mq;
 
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.sink.common.SinkContext;
 import org.apache.inlong.dataproxy.utils.BufferQueue;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -47,7 +48,9 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class MessageQueueZoneSink extends AbstractSink implements Configurable, ConfigUpdateCallback {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneSink.class);
+    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneSink.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
 
     private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000L;
 
@@ -56,7 +59,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
     private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
     // message group
     private BatchPackManager dispatchManager;
-    private BufferQueue<PackProfile> dispatchQueue;
+    private final BufferQueue<PackProfile> dispatchQueue =
+            new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
     // scheduled thread pool
     // reload
     // dispatch
@@ -71,6 +75,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
     private volatile boolean isShutdown = false;
     // whether mq cluster connected
     private volatile boolean mqClusterStarted = false;
+
     /**
      * configure
      * 
@@ -78,7 +83,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
      */
     @Override
     public void configure(Context context) {
-        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        logger.info("{} start to configure, context:{}.", this.getName(), context.toString());
         this.parentContext = context;
     }
 
@@ -87,16 +92,14 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
      */
     @Override
     public void start() {
+        if (getChannel() == null) {
+            logger.error("{}'s channel is null", this.getName());
+        }
         try {
             ConfigManager.getInstance().regMetaConfigChgCallback(this);
-            // build dispatch queue
-            this.dispatchQueue = SinkContext.createBufferQueue();
-            this.context = new MessageQueueZoneSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
-            if (getChannel() == null) {
-                LOG.error(getName() + "'s channel is null");
-            }
+            this.context = new MessageQueueZoneSinkContext(this, parentContext, getChannel());
             this.context.start();
-            this.dispatchManager = new BatchPackManager(getName(), parentContext, dispatchQueue);
+            this.dispatchManager = new BatchPackManager(this, parentContext);
             this.scheduledPool = Executors.newScheduledThreadPool(2);
             // dispatch
             this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
@@ -112,16 +115,18 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
             this.zoneProducer.start();
             // start configure change listener thread
             this.configListener = new Thread(new ConfigChangeProcessor());
-            this.configListener.setName(getName() + " configure listener");
+            this.configListener.setName(getName() + "-configure-listener");
             this.configListener.start();
             // create worker
+            MessageQueueZoneWorker zoneWorker;
             for (int i = 0; i < context.getMaxThreads(); i++) {
-                MessageQueueZoneWorker worker = new MessageQueueZoneWorker(this.getName(), i, context, zoneProducer);
-                worker.start();
-                this.workers.add(worker);
+                zoneWorker = new MessageQueueZoneWorker(this, i,
+                        context.getProcessInterval(), zoneProducer);
+                zoneWorker.start();
+                this.workers.add(zoneWorker);
             }
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            logger.error("{} start failure", this.getName(), e);
         }
         super.start();
     }
@@ -147,7 +152,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
             try {
                 worker.close();
             } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
+                logger.error("{} stop Zone worker failure", this.getName(), e);
             }
         }
         this.context.close();
@@ -208,17 +213,53 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
             this.context.addSendFailMetric();
             return Status.READY;
         } catch (Throwable t) {
-            LOG.error("Process event failed!" + this.getName(), t);
+            if (logCounter.shouldPrint()) {
+                logger.error("{} process event failed!", this.getName(), t);
+            }
             try {
                 tx.rollback();
             } catch (Throwable e) {
-                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+                if (logCounter.shouldPrint()) {
+                    logger.error("{} channel take transaction rollback exception", this.getName(), e);
+                }
             }
             return Status.BACKOFF;
         } finally {
             tx.close();
         }
     }
+    public boolean isMqClusterStarted() {
+        return mqClusterStarted;
+    }
+
+    public void setMQClusterStarted() {
+        this.mqClusterStarted = true;
+    }
+
+    public void acquireAndOfferDispatchedRecord(PackProfile record) {
+        this.dispatchQueue.acquire(record.getSize());
+        this.dispatchQueue.offer(record);
+    }
+
+    public void offerDispatchRecord(PackProfile record) {
+        this.dispatchQueue.offer(record);
+    }
+
+    public PackProfile pollDispatchedRecord() {
+        return this.dispatchQueue.pollRecord();
+    }
+
+    public void releaseAcquiredSizePermit(PackProfile record) {
+        this.dispatchQueue.release(record.getSize());
+    }
+
+    public int getDispatchQueueSize() {
+        return this.dispatchQueue.size();
+    }
+
+    public int getDispatchAvailablePermits() {
+        return this.dispatchQueue.availablePermits();
+    }
 
     @Override
     public void update() {
@@ -229,14 +270,13 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
         syncLock.notifyAll();
     }
 
-    public boolean isMqClusterStarted() {
-        return mqClusterStarted;
-    }
-
-    public void setMQClusterStarted() {
-        this.mqClusterStarted = true;
-    }
-
+    /**
+     * ConfigChangeProcessor
+     *
+     * Metadata configuration change listener class, when the metadata change notification
+     * arrives, check and change the mapping relationship between the mq cluster information
+     * and the configured inlongid to Topic,
+     */
     private class ConfigChangeProcessor implements Runnable {
 
         @Override
@@ -246,7 +286,7 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
                 try {
                     syncLock.wait();
                 } catch (InterruptedException e) {
-                    LOG.error("{} config-change processor meet interrupt, exit!", getName());
+                    logger.error("{} config-change processor meet interrupt, exit!", getName());
                     break;
                 } catch (Throwable e2) {
                     //
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 9b60cffa3d..becfade7bd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -26,7 +26,6 @@ import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.SinkContext;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
 import org.apache.commons.lang.ClassUtils;
@@ -49,8 +48,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     public static final String PREFIX_PRODUCER = "producer.";
     public static final String KEY_COMPRESS_TYPE = "compressType";
 
-    private final BufferQueue<PackProfile> dispatchQueue;
-
+    private final MessageQueueZoneSink mqZoneSink;
     private final String proxyClusterId;
     private final String nodeId;
     private final Context producerContext;
@@ -60,10 +58,9 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     /**
      * Constructor
      */
-    public MessageQueueZoneSinkContext(String sinkName, Context context, Channel channel,
-            BufferQueue<PackProfile> dispatchQueue) {
-        super(sinkName, context, channel);
-        this.dispatchQueue = dispatchQueue;
+    public MessageQueueZoneSinkContext(MessageQueueZoneSink mqZoneSink, Context context, Channel channel) {
+        super(mqZoneSink.getName(), context, channel);
+        this.mqZoneSink = mqZoneSink;
         // proxyClusterId
         this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
         // nodeId
@@ -100,12 +97,12 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     }
 
     /**
-     * get dispatchQueue
-     * 
-     * @return the dispatchQueue
+     * get message queue zone sink
+     *
+     * @return the zone sink
      */
-    public BufferQueue<PackProfile> getDispatchQueue() {
-        return dispatchQueue;
+    public MessageQueueZoneSink getMqZoneSink() {
+        return mqZoneSink;
     }
 
     /**
@@ -245,12 +242,13 @@ public class MessageQueueZoneSinkContext extends SinkContext {
             String mqName, String topic, long sendTime,
             DataProxyErrCode errCode, String errMsg) {
         if (currentRecord.isResend()) {
-            dispatchQueue.offer(currentRecord);
+            this.mqZoneSink.offerDispatchRecord(currentRecord);
             fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILRETRY);
             this.addSendResultMetric(currentRecord, mqName, topic, false, sendTime);
         } else {
-            currentRecord.fail(errCode, errMsg);
+            this.mqZoneSink.releaseAcquiredSizePermit(currentRecord);
             fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILDROPPED);
+            currentRecord.fail(errCode, errMsg);
         }
     }
 
@@ -267,11 +265,10 @@ public class MessageQueueZoneSinkContext extends SinkContext {
                 configurable.configure(new Context(CommonConfigHolder.getInstance().getProperties()));
             }
             if (selectorObject instanceof CacheClusterSelector) {
-                CacheClusterSelector selector = (CacheClusterSelector) selectorObject;
-                return selector;
+                return (CacheClusterSelector) selectorObject;
             }
         } catch (Throwable t) {
-            LOG.error("Fail to init CacheClusterSelector,selectorClass:{},error:{}",
+            logger.error("Fail to init CacheClusterSelector,selectorClass:{},error:{}",
                     strSelectorClass, t.getMessage(), t);
         }
         return null;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index 8a2e4c5abe..ad61167295 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.dataproxy.sink.mq;
 
+import org.apache.inlong.common.monitor.LogCounter;
+
 import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,22 +28,24 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageQueueZoneWorker extends Thread {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneWorker.class);
-
+    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneWorker.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
     private final String workerName;
-    private final MessageQueueZoneSinkContext context;
-
-    private MessageQueueZoneProducer zoneProducer;
+    private final long fetchWaitMs;
+    private final MessageQueueZoneSink mqZoneSink;
+    private final MessageQueueZoneProducer zoneProducer;
     private LifecycleState status;
 
     /**
      * Constructor
      */
-    public MessageQueueZoneWorker(String sinkName, int workerIndex, MessageQueueZoneSinkContext context,
-            MessageQueueZoneProducer zoneProducer) {
+    public MessageQueueZoneWorker(MessageQueueZoneSink mqZoneSink, int workerIndex,
+            long fetchWaitMs, MessageQueueZoneProducer zoneProducer) {
         super();
-        this.workerName = sinkName + "-worker-" + workerIndex;
-        this.context = context;
+        this.mqZoneSink = mqZoneSink;
+        this.workerName = mqZoneSink.getName() + "-worker-" + workerIndex;
+        this.fetchWaitMs = fetchWaitMs;
         this.zoneProducer = zoneProducer;
         this.status = LifecycleState.IDLE;
     }
@@ -70,25 +74,28 @@ public class MessageQueueZoneWorker extends Thread {
      */
     @Override
     public void run() {
-        LOG.info(String.format("start MessageQueueZoneWorker:%s", this.workerName));
+        logger.info("{} start message zone worker", this.workerName);
+        PackProfile profile = null;
         while (status != LifecycleState.STOP) {
-            PackProfile profile = null;
             try {
-                profile = context.getDispatchQueue().pollRecord();
+                profile = this.mqZoneSink.pollDispatchedRecord();
                 if (profile == null) {
                     this.sleepOneInterval();
                     continue;
                 }
                 // send
                 this.zoneProducer.send(profile);
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
+            } catch (Throwable e1) {
                 if (profile != null) {
-                    context.getDispatchQueue().offer(profile);
+                    this.mqZoneSink.offerDispatchRecord(profile);
+                }
+                if (logCounter.shouldPrint()) {
+                    logger.error("{} send message failure", workerName, e1);
                 }
                 this.sleepOneInterval();
             }
         }
+        logger.info("{} exit message zone worker", this.workerName);
     }
 
     /**
@@ -96,9 +103,11 @@ public class MessageQueueZoneWorker extends Thread {
      */
     private void sleepOneInterval() {
         try {
-            Thread.sleep(context.getProcessInterval());
+            Thread.sleep(fetchWaitMs);
         } catch (InterruptedException e1) {
-            LOG.error(e1.getMessage(), e1);
+            if (logCounter.shouldPrint()) {
+                logger.error("{} wait poll record interrupted", workerName, e1);
+            }
         }
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 1867e6a8ce..87ebe736c8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -33,8 +33,8 @@ public abstract class PackProfile {
     private final long dispatchTime;
     private final long createTime = System.currentTimeMillis();
     private final String uid;
-    protected long count = 0;
-    protected long size = 0;
+    protected int count = 0;
+    protected int size = 0;
     protected final boolean enableRetryAfterFailure;
     protected final int maxRetries;
     protected int retries = 0;
@@ -96,7 +96,7 @@ public abstract class PackProfile {
      *
      * @return the count
      */
-    public long getCount() {
+    public int getCount() {
         return count;
     }
 
@@ -105,7 +105,7 @@ public abstract class PackProfile {
      *
      * @param count the count to set
      */
-    public void setCount(long count) {
+    public void setCount(int count) {
         this.count = count;
     }
 
@@ -114,7 +114,7 @@ public abstract class PackProfile {
      *
      * @return the size
      */
-    public long getSize() {
+    public int getSize() {
         return size;
     }
 
@@ -123,7 +123,7 @@ public abstract class PackProfile {
      *
      * @param size the size to set
      */
-    public void setSize(long size) {
+    public void setSize(int size) {
         this.size = size;
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
index 662cadb173..fdf23d49de 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/RandomCacheClusterSelector.java
@@ -44,7 +44,7 @@ public class RandomCacheClusterSelector implements CacheClusterSelector, Configu
 
     /**
      * select
-     * @param allClusterList
+     * @param allConfigList
      * @return
      */
     @Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index c816feb14e..5949d38e4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -25,7 +25,6 @@ import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.base.SinkRspEvent;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.source2.InLongMessageHandler;
-import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
@@ -46,10 +45,9 @@ import java.util.Map;
  */
 public class SimplePackProfile extends PackProfile {
 
-    // log print count
-    private static final LogCounter logCounter =
-            new LogCounter(10, 100000, 30 * 1000);
     private static final Logger logger = LoggerFactory.getLogger(SimplePackProfile.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
     private static final long MINUTE_MS = 60L * 1000;
     private boolean needRspEvent = false;
     private Channel channel;
@@ -154,9 +152,10 @@ public class SimplePackProfile extends PackProfile {
      */
     public Map<String, String> getPropsToMQ() {
         Map<String, String> result = new HashMap<>();
-        result.put(Constants.HEADER_KEY_SOURCE_TIME, event.getHeaders().get(AttributeConstants.RCV_TIME));
+        result.put(AttributeConstants.RCV_TIME, event.getHeaders().get(AttributeConstants.RCV_TIME));
         result.put(ConfigConstants.MSG_ENCODE_VER, event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER));
         result.put(EventConstants.HEADER_KEY_VERSION, event.getHeaders().get(EventConstants.HEADER_KEY_VERSION));
+        result.put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY));
         result.put(ConfigConstants.DATAPROXY_IP_KEY, NetworkUtils.getLocalIp());
         return result;
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index e91f41f4b4..126ca385dc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -52,7 +52,7 @@ import java.util.Set;
  */
 public class KafkaHandler implements MessageQueueHandler {
 
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(KafkaHandler.class);
     // log print count
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
 
@@ -88,11 +88,11 @@ public class KafkaHandler implements MessageQueueHandler {
             Context context = this.sinkContext.getProducerContext();
             props.putAll(context.getParameters());
             props.putAll(config.getParams());
-            LOG.info("try to create kafka client:{}", props);
+            logger.info("try to create kafka client:{}", props);
             producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
-            LOG.info("create new producer success:{}", producer);
+            logger.info("create new producer success:{}", producer);
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -108,7 +108,7 @@ public class KafkaHandler implements MessageQueueHandler {
     public void stop() {
         // kafka producer
         this.producer.close();
-        LOG.info("kafka handler stopped");
+        logger.info("kafka handler stopped");
     }
 
     /**
@@ -128,7 +128,7 @@ public class KafkaHandler implements MessageQueueHandler {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -136,7 +136,7 @@ public class KafkaHandler implements MessageQueueHandler {
                 if (StringUtils.isEmpty(topic)) {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -162,7 +162,7 @@ public class KafkaHandler implements MessageQueueHandler {
             sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
             if (logCounter.shouldPrint()) {
-                LOG.error("Send Message to Kafka failure", ex);
+                logger.error("Send Message to Kafka failure", ex);
             }
             return false;
         }
@@ -205,12 +205,12 @@ public class KafkaHandler implements MessageQueueHandler {
                     sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                     if (logCounter.shouldPrint()) {
-                        LOG.error("Send BatchPackProfile to Kafka failure", ex);
+                        logger.error("Send BatchPackProfile to Kafka failure", ex);
                     }
                 } else {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(batchProfile, clusterName, topic, true, sendTime);
-                    sinkContext.getDispatchQueue().release(batchProfile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
                     batchProfile.ack();
                 }
             }
@@ -251,14 +251,14 @@ public class KafkaHandler implements MessageQueueHandler {
                     sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                     if (logCounter.shouldPrint()) {
-                        LOG.error("Send SimplePackProfile to Kafka failure", ex);
+                        logger.error("Send SimplePackProfile to Kafka failure", ex);
                     }
                 } else {
                     sinkContext.fileMetricAddSuccCnt(simpleProfile, topic,
                             arg0 == null ? "" : String.valueOf(arg0.partition()));
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(simpleProfile, clusterName, topic, true, sendTime);
-                    sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
                     simpleProfile.ack();
                 }
             }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 3933045d74..66f8067014 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -63,7 +63,7 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTER
  */
 public class PulsarHandler implements MessageQueueHandler {
 
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(PulsarHandler.class);
     // log print count
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
 
@@ -166,9 +166,9 @@ public class PulsarHandler implements MessageQueueHandler {
                     .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
                     .compressionType(this.getPulsarCompressionType());
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            logger.error(e.getMessage(), e);
         }
-        LOG.info("pulsar handler started");
+        logger.info("pulsar handler started");
     }
 
     /**
@@ -180,15 +180,15 @@ public class PulsarHandler implements MessageQueueHandler {
             try {
                 entry.getValue().close();
             } catch (PulsarClientException e) {
-                LOG.error(e.getMessage(), e);
+                logger.error(e.getMessage(), e);
             }
         }
         try {
             this.client.close();
         } catch (PulsarClientException e) {
-            LOG.error(e.getMessage(), e);
+            logger.error(e.getMessage(), e);
         }
-        LOG.info("pulsar handler stopped");
+        logger.info("pulsar handler stopped");
     }
 
     @Override
@@ -213,7 +213,7 @@ public class PulsarHandler implements MessageQueueHandler {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -221,7 +221,7 @@ public class PulsarHandler implements MessageQueueHandler {
                 if (StringUtils.isEmpty(producerTopic)) {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -233,22 +233,22 @@ public class PulsarHandler implements MessageQueueHandler {
             Producer<byte[]> producer = this.producerMap.get(producerTopic);
             if (producer == null) {
                 try {
-                    LOG.info("try to new a object for topic " + producerTopic);
+                    logger.info("try to new a object for topic " + producerTopic);
                     SecureRandom secureRandom = new SecureRandom(
                             (producerTopic + System.currentTimeMillis()).getBytes());
                     String producerName = producerTopic + "-" + secureRandom.nextLong();
                     producer = baseBuilder.clone().topic(producerTopic)
                             .producerName(producerName)
                             .create();
-                    LOG.info("create new producer success:{}", producer.getProducerName());
+                    logger.info("create new producer success:{}", producer.getProducerName());
                     Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
                     if (oldProducer != null) {
                         producer.close();
-                        LOG.info("close producer success:{}", producer.getProducerName());
+                        logger.info("close producer success:{}", producer.getProducerName());
                         producer = oldProducer;
                     }
                 } catch (Throwable ex) {
-                    LOG.error("create new producer failed", ex);
+                    logger.error("create new producer failed", ex);
                 }
             }
             // create producer failed
@@ -270,7 +270,7 @@ public class PulsarHandler implements MessageQueueHandler {
             sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
             if (logCounter.shouldPrint()) {
-                LOG.error("Send Message to Pulsar failure", ex);
+                logger.error("Send Message to Pulsar failure", ex);
             }
             return false;
         }
@@ -327,12 +327,12 @@ public class PulsarHandler implements MessageQueueHandler {
                 sinkContext.processSendFail(batchProfile, clusterName, producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send ProfileV1 to Pulsar failure", ex);
+                    logger.error("Send ProfileV1 to Pulsar failure", ex);
                 }
             } else {
                 sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(batchProfile, clusterName, producerTopic, true, sendTime);
-                sinkContext.getDispatchQueue().release(batchProfile.getSize());
+                sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
                 batchProfile.ack();
             }
         });
@@ -362,13 +362,13 @@ public class PulsarHandler implements MessageQueueHandler {
                 sinkContext.processSendFail(simpleProfile, clusterName, producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+                    logger.error("Send SimpleProfileV0 to Pulsar failure", ex);
                 }
             } else {
                 sinkContext.fileMetricAddSuccCnt(simpleProfile, producerTopic, msgId.toString());
                 sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(simpleProfile, clusterName, producerTopic, true, sendTime);
-                sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+                sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
                 simpleProfile.ack();
             }
         });
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index e8054f8863..dcd28d13c3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -55,7 +55,7 @@ import java.util.Set;
  */
 public class TubeHandler implements MessageQueueHandler {
 
-    public static final Logger LOG = LoggerFactory.getLogger(TubeHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(TubeHandler.class);
     // log print count
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
 
@@ -98,12 +98,12 @@ public class TubeHandler implements MessageQueueHandler {
         try {
             // prepare configuration
             TubeClientConfig conf = initTubeConfig();
-            LOG.info("try to create producer:{}", conf.toJsonString());
+            logger.info("try to create producer:{}", conf.toJsonString());
             this.sessionFactory = new TubeMultiSessionFactory(conf);
             this.producer = sessionFactory.createProducer();
-            LOG.info("create new producer success:{}", producer);
+            logger.info("create new producer success:{}", producer);
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -116,10 +116,10 @@ public class TubeHandler implements MessageQueueHandler {
         try {
             published = producer.publish(newTopicSet);
             this.topicSet.addAll(newTopicSet);
-            LOG.info("Publish topics to {}, need publish are {}, published are {}",
+            logger.info("Publish topics to {}, need publish are {}, published are {}",
                     this.clusterName, newTopicSet, published);
         } catch (Throwable e) {
-            LOG.warn("Publish topics to {} failure", this.clusterName, e);
+            logger.warn("Publish topics to {} failure", this.clusterName, e);
         }
     }
 
@@ -166,17 +166,17 @@ public class TubeHandler implements MessageQueueHandler {
             try {
                 this.producer.shutdown();
             } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
+                logger.error(e.getMessage(), e);
             }
         }
         if (this.sessionFactory != null) {
             try {
                 this.sessionFactory.shutdown();
             } catch (TubeClientException e) {
-                LOG.error(e.getMessage(), e);
+                logger.error(e.getMessage(), e);
             }
         }
-        LOG.info("tube handler stopped");
+        logger.info("tube handler stopped");
     }
 
     /**
@@ -193,7 +193,7 @@ public class TubeHandler implements MessageQueueHandler {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -201,7 +201,7 @@ public class TubeHandler implements MessageQueueHandler {
                 if (StringUtils.isEmpty(topic)) {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
                     sinkContext.addSendResultMetric(profile, clusterName, profile.getUid(), false, 0);
-                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
                     profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
@@ -233,7 +233,7 @@ public class TubeHandler implements MessageQueueHandler {
             sinkContext.processSendFail(profile, clusterName, profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
             if (logCounter.shouldPrint()) {
-                LOG.error("Send Message to Tube failure", ex);
+                logger.error("Send Message to Tube failure", ex);
             }
             return false;
         }
@@ -270,14 +270,14 @@ public class TubeHandler implements MessageQueueHandler {
                 if (result.isSuccess()) {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(batchProfile, clusterName, topic, true, sendTime);
-                    sinkContext.getDispatchQueue().release(batchProfile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
                     batchProfile.ack();
                 } else {
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
                     sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, result.getErrMsg());
                     if (logCounter.shouldPrint()) {
-                        LOG.error("Send ProfileV1 to tube failure {}", result.getErrMsg());
+                        logger.error("Send ProfileV1 to tube failure {}", result.getErrMsg());
                     }
                 }
             }
@@ -288,7 +288,7 @@ public class TubeHandler implements MessageQueueHandler {
                 sinkContext.processSendFail(batchProfile, clusterName, topic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send ProfileV1 to tube exception", ex);
+                    logger.error("Send ProfileV1 to tube exception", ex);
                 }
             }
         };
@@ -319,7 +319,7 @@ public class TubeHandler implements MessageQueueHandler {
                     sinkContext.fileMetricAddSuccCnt(simpleProfile, topic, result.getPartition().getHost());
                     sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(simpleProfile, clusterName, topic, true, sendTime);
-                    sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+                    sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simpleProfile);
                     simpleProfile.ack();
                 } else {
                     sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
@@ -328,7 +328,7 @@ public class TubeHandler implements MessageQueueHandler {
                     sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, result.getErrMsg());
                     if (logCounter.shouldPrint()) {
-                        LOG.error("Send SimpleProfileV0 to tube failure: {}", result.getErrMsg());
+                        logger.error("Send SimpleProfileV0 to tube failure: {}", result.getErrMsg());
                     }
                 }
             }
@@ -340,7 +340,7 @@ public class TubeHandler implements MessageQueueHandler {
                 sinkContext.processSendFail(simpleProfile, clusterName, topic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send SimpleProfileV0 to tube exception", ex);
+                    logger.error("Send SimpleProfileV0 to tube exception", ex);
                 }
             }
         };
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
index 1acb241a6d..c11cffa63e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v1msg/InlongTcpSourceCallback.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.source2.v1msg;
 
+import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
@@ -37,8 +38,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class InlongTcpSourceCallback implements SourceCallback {
 
-    public static final Logger LOG = LoggerFactory.getLogger(InlongTcpSourceCallback.class);
-
+    private static final Logger logger = LoggerFactory.getLogger(InlongTcpSourceCallback.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
     private final ChannelHandlerContext ctx;
     private final MessagePackHeader header;
     private final CountDownLatch latch;
@@ -47,8 +49,8 @@ public class InlongTcpSourceCallback implements SourceCallback {
     /**
      * Constructor
      *
-     * @param ctx
-     * @param header
+     * @param ctx the channel context
+     * @param header the message pack header
      */
     public InlongTcpSourceCallback(ChannelHandlerContext ctx, MessagePackHeader header) {
         this.ctx = ctx;
@@ -59,7 +61,7 @@ public class InlongTcpSourceCallback implements SourceCallback {
     /**
      * callback
      *
-     * @param resultCode
+     * @param resultCode the result code
      */
     @Override
     public void callback(ResultCode resultCode) {
@@ -82,13 +84,17 @@ public class InlongTcpSourceCallback implements SourceCallback {
             if (remoteChannel.isWritable()) {
                 remoteChannel.write(buffer);
             } else {
-                LOG.warn("the send buffer2 is full, so disconnect it!"
-                        + "please check remote client; Connection info:{}",
-                        remoteChannel);
+                if (logCounter.shouldPrint()) {
+                    logger.warn("the send buffer2 is full, so disconnect it!"
+                            + " please check remote client; Connection info:{}",
+                            remoteChannel);
+                }
                 buffer.release();
             }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+        } catch (Throwable e) {
+            if (logCounter.shouldPrint()) {
+                logger.error("Send response failure", e);
+            }
         } finally {
             // notice TCP session
             this.latch.countDown();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
index 33e51485a6..15eea923f5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -27,14 +27,14 @@ public class BufferQueue<A> {
 
     private final LinkedBlockingQueue<A> queue;
     private final SizeSemaphore currentTokens;
-    private SizeSemaphore globalTokens;
+    private SizeSemaphore globalTokens = null;
     private final AtomicLong offerCount = new AtomicLong(0);
     private final AtomicLong pollCount = new AtomicLong(0);
 
     /**
      * Constructor
      * 
-     * @param maxSizeKb
+     * @param maxSizeKb  the initial size of permits to acquire
      */
     public BufferQueue(int maxSizeKb) {
         this.queue = new LinkedBlockingQueue<>();
@@ -44,8 +44,8 @@ public class BufferQueue<A> {
     /**
      * Constructor
      * 
-     * @param maxSizeKb
-     * @param globalTokens
+     * @param maxSizeKb    the initial size of permits to acquire
+     * @param globalTokens the global permit semaphore
      */
     public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
         this(maxSizeKb);
@@ -109,42 +109,49 @@ public class BufferQueue<A> {
     }
 
     /**
-     * tryAcquire
+     * try to acquire required size permit
+     *
+     * @param sizeInByte  the size of permits to acquire
+     * @return  true if the permits were acquired and false otherwise
      */
     public boolean tryAcquire(long sizeInByte) {
-        boolean cidResult = currentTokens.tryAcquire(sizeInByte);
-        if (!cidResult) {
-            return false;
-        }
         if (this.globalTokens == null) {
-            return true;
-        }
-        boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
-        if (globalResult) {
-            return true;
+            return currentTokens.tryAcquire(sizeInByte);
+        } else {
+            if (!this.globalTokens.tryAcquire(sizeInByte)) {
+                return false;
+            }
+            if (currentTokens.tryAcquire(sizeInByte)) {
+                return true;
+            } else {
+                this.globalTokens.release(sizeInByte);
+                return false;
+            }
         }
-        currentTokens.release(sizeInByte);
-        return false;
     }
 
     /**
-     * acquire
+     * acquire size permit
+     *
+     * @param sizeInByte the size of permits to acquire
      */
     public void acquire(long sizeInByte) {
-        currentTokens.acquire(sizeInByte);
         if (this.globalTokens != null) {
             globalTokens.acquire(sizeInByte);
         }
+        currentTokens.acquire(sizeInByte);
     }
 
     /**
-     * release
+     * release size permit
+     *
+     * @param sizeInByte the size of permits to release
      */
     public void release(long sizeInByte) {
+        this.currentTokens.release(sizeInByte);
         if (this.globalTokens != null) {
             this.globalTokens.release(sizeInByte);
         }
-        this.currentTokens.release(sizeInByte);
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
index 99b1b35e86..adb4881b68 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
@@ -30,7 +30,7 @@ public class SizeSemaphore {
     private int maxSize = 0;
     private int leftSize = 0;
     private Semaphore sizeSemaphore = null;
-    private AtomicInteger leftSemaphore = new AtomicInteger(0);
+    private final AtomicInteger leftSemaphore = new AtomicInteger(0);
 
     /**
      * Constructor