You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/08/15 10:56:55 UTC

[inlong] branch master updated: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 27217e34b [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
27217e34b is described below

commit 27217e34ba57097019fe0ebdac013363e95e2cc2
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Aug 15 18:56:48 2022 +0800

    [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
---
 .../dataproxy/config/pojo/MQClusterConfig.java     |  13 +-
 .../dataproxy/sink/SimpleMessageTubeSink.java      |  34 +-
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 881 ++++++++-------------
 .../dataproxy/sink/common/MsgDedupHandler.java     |   8 +
 .../dataproxy/sink/common/TubeProducerHolder.java  | 277 +++++++
 .../inlong/dataproxy/sink/common/TubeUtils.java    |  80 ++
 6 files changed, 731 insertions(+), 562 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
index b49234b18..f1a87708e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
@@ -86,7 +86,7 @@ public class MQClusterConfig extends Context {
     private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
 
     private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout";
-    private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
+    private static final long DEFAULT_TUBE_REQUEST_TIMEOUT = 20000L;
 
     private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
     private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
@@ -100,6 +100,9 @@ public class MQClusterConfig extends Context {
     private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
     private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
 
+    private static final String HEARTBEAT_C2M_PERIOD_MS_MARK = "tube_heartbeat_period_ms";
+    private static final long DEFAULT_HEARTBEAT_C2M_PERIOD_MS = 15000L;
+
     private static final String RECOVER_THREAD_COUNT = "recover_thread_count";
     private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
 
@@ -159,6 +162,10 @@ public class MQClusterConfig extends Context {
         return getLong(NETTY_WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK);
     }
 
+    public long getTubeHeartbeatPeriodMs() {
+        return getLong(HEARTBEAT_C2M_PERIOD_MS_MARK, DEFAULT_HEARTBEAT_C2M_PERIOD_MS);
+    }
+
     public int getRecoverThreadCount() {
         return getInteger(RECOVER_THREAD_COUNT, DEFAULT_RECOVER_THREAD_COUNT);
     }
@@ -284,8 +291,8 @@ public class MQClusterConfig extends Context {
         return getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD, DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD);
     }
 
-    public int getTubeRequestTimeout() {
-        return getInteger(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT);
+    public long getTubeRpcTimeoutMs() {
+        return getLong(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT);
     }
 
     public String getLogTopic() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
index 990b44cac..521f3c24f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -43,12 +43,12 @@ import org.apache.flume.source.shaded.guava.RateLimiter;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
@@ -57,7 +57,6 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
 import org.slf4j.Logger;
@@ -346,6 +345,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
     }
 
     class SinkTask implements Runnable {
+
         private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es)
             throws TubeClientException, InterruptedException {
             if (msgDedupHandler.judgeDupAndPutMsgSeqId(
@@ -353,38 +353,12 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
                 logger.info("{} agent package {} existed,just discard.",
                         getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
             } else {
-                Message message = this.parseEvent2Message(topic, event);
-                producer.sendMessage(message, new MyCallback(es));
+                producer.sendMessage(TubeUtils.buildMessage(
+                        topic, event, true), new MyCallback(es));
                 flag.set(true);
             }
             illegalTopicMap.remove(topic);
         }
-        
-        /**
-         * parseEvent2Message
-         * @param topic
-         * @param event
-         * @return
-         */
-        private Message parseEvent2Message(String topic, Event event) {
-            Message message = new Message(topic, event.getBody());
-            message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-            String streamId = "";
-            if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-                streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-            } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                streamId = event.getHeaders().get(AttributeConstants.INAME);
-            }
-            message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-            // common attributes
-            Map<String, String> headers = event.getHeaders();
-            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
-            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
-            message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
-            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
-            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
-            return message;
-        }
 
         private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
             if (t instanceof TubeClientException) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 00abce075..e49903329 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -17,20 +17,19 @@
 
 package org.apache.inlong.dataproxy.sink;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.base.Preconditions;
 import org.apache.commons.collections.SetUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -42,382 +41,172 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.shaded.guava.RateLimiter;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.NetworkUtils;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TubeSink extends AbstractSink implements Configurable {
 
     private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
-    private static final MsgDedupHandler msgDedupHandler = new MsgDedupHandler();
-    private static final ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
+    private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
+    private TubeProducerHolder producerHolder = null;
     private static final String TOPIC = "topic";
-    // key: masterUrl
-    public Map<String, TubeMultiSessionFactory> sessionFactories;
-    public Map<String, List<TopicProducerInfo>> masterUrl2producers;
-    // key: topic
-    public Map<String, List<TopicProducerInfo>> producerInfoMap;
     private volatile boolean canTake = false;
     private volatile boolean canSend = false;
+    private volatile boolean isOverFlow = false;
     private ConfigManager configManager;
     private Map<String, String> topicProperties;
     private MQClusterConfig tubeConfig;
+    private String usedMasterAddr = null;
     private Set<String> masterHostAndPortLists;
-
     // used for RoundRobin different cluster while send message
-    private AtomicInteger clusterIndex = new AtomicInteger(0);
-    private LinkedBlockingQueue<EventStat> resendQueue;
-    private LinkedBlockingQueue<Event> eventQueue;
     private RateLimiter diskRateLimiter;
     private Thread[] sinkThreadPool;
     private Map<String, String> dimensions;
     private DataProxyMetricItemSet metricItemSet;
-
-    private boolean overflow = false;
-
-    /**
-     * diff publish
-     */
-    public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
-        if (SetUtils.isEqualSet(originalSet, endSet)) {
-            return;
-        }
-
-        boolean changed = false;
-        Set<String> newTopics = new HashSet<>();
-        for (String s : endSet) {
-            if (!originalSet.contains(s)) {
-                changed = true;
-                newTopics.add(s);
-            }
-        }
-
-        if (changed) {
-            try {
-                initTopicSet(newTopics);
-            } catch (Exception e) {
-                logger.info("meta sink publish new topic fail.", e);
-            }
-
-            logger.info("topics.properties has changed, trigger diff publish for {}", getName());
-            topicProperties = configManager.getTopicProperties();
-        }
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private static final LogCounter LOG_SINK_TASK_PRINTER =
+            new LogCounter(10, 100000, 60 * 1000);
+    private LinkedBlockingQueue<Event> eventQueue;
+    private LinkedBlockingQueue<EventStat> resendQueue;
+    private final AtomicLong cachedMsgCnt = new AtomicLong(0);
+    private final AtomicLong takenMsgCnt = new AtomicLong(0);
+    private final AtomicLong resendMsgCnt = new AtomicLong(0);
+    private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0);
+    private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0);
+    private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0);
+    private final AtomicLong inflightMsgCnt = new AtomicLong(0);
+    private final AtomicLong successMsgCnt = new AtomicLong(0);
+    // statistic thread
+    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
+            .newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread"));
+
+    {
+        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L,
+                60L, TimeUnit.SECONDS);
+        logger.info("success to start performance statistic task!");
     }
 
-    /**
-     * when masterUrlLists change, update tubeClient
-     *
-     * @param originalCluster previous masterHostAndPortList set
-     * @param endCluster new masterHostAndPortList set
-     */
-    public void diffUpdateTubeClient(Set<String> originalCluster, Set<String> endCluster) {
-        if (SetUtils.isEqualSet(originalCluster, endCluster)) {
-            return;
-        }
-        // close
-        for (String masterUrl : originalCluster) {
-
-            if (!endCluster.contains(masterUrl)) {
-                // step1: close and remove all related producers
-                List<TopicProducerInfo> producerInfoList = masterUrl2producers.get(masterUrl);
-                if (producerInfoList != null) {
-                    for (TopicProducerInfo producerInfo : producerInfoList) {
-                        producerInfo.shutdown();
-                        // remove from topic<->producer map
-                        for (String topic : producerInfo.getTopicSet()) {
-                            List<TopicProducerInfo> curTopicProducers = producerInfoMap.get(topic);
-                            if (curTopicProducers != null) {
-                                curTopicProducers.remove(producerInfo);
-                            }
-                        }
-                    }
-                    // remove from masterUrl<->producer map
-                    masterUrl2producers.remove(masterUrl);
-                }
-
-                // step2: close and remove related sessionFactories
-                TubeMultiSessionFactory sessionFactory = sessionFactories.get(masterUrl);
-                if (sessionFactory != null) {
-                    try {
-                        sessionFactory.shutdown();
-                    } catch (TubeClientException e) {
-                        logger.error("destroy sessionFactory error in tubesink, MetaClientException {}",
-                                e.getMessage());
-                    }
-                    sessionFactories.remove(masterUrl);
-                }
-
-                logger.info("close tubeClient of masterList:{}", masterUrl);
-            }
-
-        }
-        // start new client
-        for (String masterUrl : endCluster) {
-            if (!originalCluster.contains(masterUrl)) {
-                TubeMultiSessionFactory sessionFactory = createConnection(masterUrl);
-                if (sessionFactory != null) {
-                    List<Set<String>> topicGroups = partitionTopicSet(new HashSet<>(topicProperties.values()));
-                    for (Set<String> topicSet : topicGroups) {
-                        createTopicProducers(masterUrl, sessionFactory, topicSet);
-                    }
-                    logger.info("successfully start new tubeClient for the new masterList: {}", masterUrl);
-                }
-            }
-        }
-
+    @Override
+    public void configure(Context context) {
+        logger.info(getName() + " configure from context: {}", context);
+        // initial parameters
+        configManager = ConfigManager.getInstance();
+        tubeConfig = configManager.getMqClusterConfig();
+        topicProperties = configManager.getTopicProperties();
         masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
-    }
-
-    /**
-     * when there are multi clusters, pick producer based on round-robin
-     */
-    private MessageProducer getProducer(String topic) throws TubeClientException {
-        if (producerInfoMap.containsKey(topic) && !producerInfoMap.get(topic).isEmpty()) {
-
-            List<TopicProducerInfo> producers = producerInfoMap.get(topic);
-            // round-roubin dispatch
-            int currentIndex = clusterIndex.getAndIncrement();
-            if (currentIndex > Integer.MAX_VALUE / 2) {
-                clusterIndex.set(0);
-            }
-            int producerIndex = currentIndex % producers.size();
-            return producers.get(producerIndex).getProducer();
-        }
-        return null;
-//        else {
-//            synchronized (this) {
-//              if (!producerInfoMap.containsKey(topic)) {
-//                 if (producer == null || currentPublishTopicNum.get() >= tubeConfig.getMaxTopicsEachProducerHold()) {
-//                        producer = sessionFactory.createProducer();
-//                        currentPublishTopicNum.set(0);
-//                    }
-//                    // publish topic
-//                    producer.publish(topic);
-//                    producerMap.put(topic, producer);
-//                    currentPublishTopicNum.incrementAndGet();
-//                }
-//            }
-//            return producerMap.get(topic);
-//        }
-    }
-
-    private TubeClientConfig initTubeConfig(String masterUrl) throws Exception {
-        final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterUrl);
-        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount());
-        tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount());
-        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount());
-        tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark());
-        tubeClientConfig.setHeartbeatPeriodMs(15000L);
-        tubeClientConfig.setRpcTimeoutMs(20000L);
-
-        return tubeClientConfig;
-    }
-
-    /**
-     * If this function is called successively without calling {@see #destroyConnection()}, only the
-     * first call has any effect.
-     *
-     * @throws FlumeException if an RPC client connection could not be opened
-     */
-    private void initCreateConnection() throws FlumeException {
-        // check the TubeMQ address
-        if (masterHostAndPortLists == null || masterHostAndPortLists.isEmpty()) {
-            logger.warn("Failed to get TubeMQ Cluster, make sure register TubeMQ to manager successfully.");
-            return;
-        }
-        // if already connected, just skip
-        if (sessionFactories != null) {
-            return;
-        }
-        sessionFactories = new HashMap<>();
-        for (String masterUrl : masterHostAndPortLists) {
-            createConnection(masterUrl);
-        }
-
-        if (sessionFactories.size() == 0) {
-            throw new FlumeException("create tube sessionFactories err, please re-check");
-        }
-    }
-
-    private TubeMultiSessionFactory createConnection(String masterHostAndPortList) {
-        TubeMultiSessionFactory sessionFactory;
-        try {
-            TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
-            sessionFactory = new TubeMultiSessionFactory(conf);
-            sessionFactories.put(masterHostAndPortList, sessionFactory);
-        } catch (Throwable e) {
-            logger.error("connect to tube meta error, maybe tube master set error/shutdown, please re-check", e);
-            throw new FlumeException("connect to tube meta error, maybe tube master set error/shutdown in progress, "
-                    + "please re-check");
-        }
-        return sessionFactory;
-    }
-
-    private void destroyConnection() {
-        for (List<TopicProducerInfo> producerInfoList : producerInfoMap.values()) {
-            for (TopicProducerInfo producerInfo : producerInfoList) {
-                producerInfo.shutdown();
-            }
-        }
-        producerInfoMap.clear();
-
-        if (sessionFactories != null) {
-            for (TubeMultiSessionFactory sessionFactory : sessionFactories.values()) {
-                try {
-                    sessionFactory.shutdown();
-                } catch (Exception e) {
-                    logger.error("destroy sessionFactory error in tubesink: ", e);
-                }
-            }
-        }
-        sessionFactories.clear();
-        masterUrl2producers.clear();
-        logger.debug("closed meta producer");
-    }
-
-    /**
-     * partition topicSet to different group, each group is associated with a producer;
-     * if there are multi clusters, then each group is associated with a set of producer
-     */
-    private List<Set<String>> partitionTopicSet(Set<String> topicSet) {
-        List<Set<String>> topicGroups = new ArrayList<>();
-
-        List<String> sortedList = new ArrayList<>(topicSet);
-        Collections.sort(sortedList);
-        int maxTopicsEachProducerHolder = tubeConfig.getMaxTopicsEachProducerHold();
-        int cycle = sortedList.size() / maxTopicsEachProducerHolder;
-        int remainder = sortedList.size() % maxTopicsEachProducerHolder;
-
-        for (int i = 0; i <= cycle; i++) {
-            // allocate topic
-            Set<String> subset = new HashSet<>();
-            int startIndex = i * maxTopicsEachProducerHolder;
-            int endIndex = startIndex + maxTopicsEachProducerHolder - 1;
-            if (i == cycle) {
-                if (remainder == 0) {
-                    continue;
-                } else {
-                    endIndex = startIndex + remainder - 1;
-                }
-            }
-            for (int index = startIndex; index <= endIndex; index++) {
-                subset.add(sortedList.get(index));
-            }
-
-            topicGroups.add(subset);
+        // start message deduplication handler
+        MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
+                tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
+        // only use first cluster address now
+        usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
+        // create producer holder
+        producerHolder = new TubeProducerHolder(getName(),
+                usedMasterAddr, configManager.getMqClusterConfig());
+        // initial TubeMQ configure
+        //     initial resend queue size
+        int badEventQueueSize = tubeConfig.getBadEventQueueSize();
+        Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
+        resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
+        //     initial sink thread pool
+        int threadNum = tubeConfig.getThreadNum();
+        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+        sinkThreadPool = new Thread[threadNum];
+        //     initial event queue size
+        int eventQueueSize = tubeConfig.getEventQueueSize();
+        Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
+        eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
+        //     initial disk rate limiter
+        if (tubeConfig.getDiskIoRatePerSec() != 0) {
+            diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
         }
-        return topicGroups;
-    }
-
-    /**
-     * create producer and publish topic
-     */
-    private void createTopicProducers(String masterUrl, TubeMultiSessionFactory sessionFactory,
-            Set<String> topicGroup) {
-
-        TopicProducerInfo info = new TopicProducerInfo(sessionFactory);
-        info.initProducer();
-        Set<String> succTopicSet = info.publishTopic(topicGroup);
-
-        masterUrl2producers.computeIfAbsent(masterUrl, k -> new ArrayList<>()).add(info);
-
-        if (succTopicSet != null) {
-            for (String succTopic : succTopicSet) {
-                producerInfoMap.computeIfAbsent(succTopic, k -> new ArrayList<>()).add(info);
-
+        // register configure change callback functions
+        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
+            @Override
+            public void update() {
+                diffSetPublish(new HashSet<>(topicProperties.values()),
+                        new HashSet<>(configManager.getTopicProperties().values()));
             }
-        }
-    }
-
-    private void initTopicSet(Set<String> topicSet) throws Exception {
-        long startTime = System.currentTimeMillis();
-
-        if (sessionFactories != null) {
-            List<Set<String>> topicGroups = partitionTopicSet(topicSet);
-            for (Set<String> subset : topicGroups) {
-                for (Map.Entry<String, TubeMultiSessionFactory> entry : sessionFactories.entrySet()) {
-                    createTopicProducers(entry.getKey(), entry.getValue(), subset);
-                }
+        });
+        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+            @Override
+            public void update() {
+                diffUpdateTubeClient(masterHostAndPortLists,
+                        configManager.getMqClusterUrl2Token().keySet());
             }
-            logger.info(getName() + " producer is ready for topics : " + producerInfoMap.keySet());
-            logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
-        }
+        });
     }
 
     @Override
     public void start() {
+        if (!this.started.compareAndSet(false, true)) {
+            logger.info("Duplicated call, " + getName() + " has started!");
+            return;
+        }
         this.dimensions = new HashMap<>();
         this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
         // register metrics
         this.metricItemSet = new DataProxyMetricItemSet(this.getName());
         MetricRegister.register(metricItemSet);
-
         // create tube connection
         try {
-            initCreateConnection();
+            producerHolder.start(new HashSet<>(topicProperties.values()));
         } catch (FlumeException e) {
-            logger.error("Unable to create tube client" + ". Exception follows.", e);
-            // Try to prevent leaking resources
-            destroyConnection();
-            // FIXME: Mark ourselves as failed
-            stop();
+            logger.error("Unable to start TubeMQ client. Exception follows.", e);
+            super.stop();
             return;
         }
-
         // start the cleaner thread
-
         super.start();
         this.canSend = true;
         this.canTake = true;
-
-        try {
-            initTopicSet(new HashSet<String>(topicProperties.values()));
-        } catch (Exception e) {
-            logger.info("meta sink start publish topic fail.", e);
-        }
-
         for (int i = 0; i < sinkThreadPool.length; i++) {
-            sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+            sinkThreadPool[i] = new Thread(new TubeSinkTask(),
+                    getName() + "_tube_sink_sender-" + i);
             sinkThreadPool[i].start();
         }
-
+        logger.info(getName() + " started!");
     }
 
-    /**
-     * resend event
-     */
-    private void resendEvent(EventStat es, boolean isDecrement) {
-        try {
-            if (es == null || es.getEvent() == null) {
-                return;
+    @Override
+    public void stop() {
+        if (!this.started.compareAndSet(true, false)) {
+            logger.info("Duplicated call, " + getName() + " has stopped!");
+            return;
+        }
+        this.canTake = false;
+        if (sinkThreadPool != null) {
+            for (Thread thread : sinkThreadPool) {
+                if (thread == null) {
+                    continue;
+                }
+                thread.interrupt();
             }
-            msgDedupHandler.invalidMsgSeqId(es.getEvent()
-                    .getHeaders().get(ConfigConstants.SEQUENCE_ID));
-        } catch (Throwable throwable) {
-            logger.error(getName() + " Discard msg because put events to both of queue and "
-                    + "fileChannel fail,current resendQueue.size = "
-                    + resendQueue.size(), throwable);
         }
+        if (producerHolder != null) {
+            producerHolder.stop();
+        }
+        super.stop();
+        logger.info(getName() + " stopped!");
     }
 
     @Override
@@ -442,19 +231,20 @@ public class TubeSink extends AbstractSink implements Configurable {
                 } else {
                     dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
                 }
-                if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
-                    logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
-                            + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
-                            + "it will cause memoryChannel full and fileChannel write.)", getName());
-                    tx.rollback();
-                    // metric
+                if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
+                    tx.commit();
+                    cachedMsgCnt.incrementAndGet();
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readFailCount.incrementAndGet();
+                    metricItem.readSuccessCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 } else {
-                    tx.commit();
+                    tx.rollback();
+                    //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
+                    //        + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time "
+                    //        + "it will cause memoryChannel full and fileChannel write.)", getName());
+                    // metric
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readSuccessCount.incrementAndGet();
+                    metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 }
             } else {
@@ -475,196 +265,116 @@ public class TubeSink extends AbstractSink implements Configurable {
         return status;
     }
 
-    @Override
-    public void configure(Context context) {
-        logger.info("configure from context: {}", context);
-
-        configManager = ConfigManager.getInstance();
-        topicProperties = configManager.getTopicProperties();
-        masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
-        tubeConfig = configManager.getMqClusterConfig();
-        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-            @Override
-            public void update() {
-                diffSetPublish(new HashSet<>(topicProperties.values()),
-                        new HashSet<>(configManager.getTopicProperties().values()));
-            }
-        });
-        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
-            @Override
-            public void update() {
-                diffUpdateTubeClient(masterHostAndPortLists, configManager.getMqClusterUrl2Token().keySet());
-            }
-        });
-
-        producerInfoMap = new ConcurrentHashMap<>();
-        masterUrl2producers = new ConcurrentHashMap<>();
-        // start message deduplication handler
-        msgDedupHandler.start(tubeConfig.getClientIdCache(),
-                tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
-
-        int badEventQueueSize = tubeConfig.getBadEventQueueSize();
-        Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
-        resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
-
-        int threadNum = tubeConfig.getThreadNum();
-        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
-        sinkThreadPool = new Thread[threadNum];
-        int eventQueueSize = tubeConfig.getEventQueueSize();
-        Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
-        eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
-
-        if (tubeConfig.getDiskIoRatePerSec() != 0) {
-            diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
-        }
-
-    }
-
-    private Map<String, String> getNewDimension(String otherKey, String value) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-        dimensions.put(otherKey, value);
-        return dimensions;
-    }
-
-    /**
-     * get metricItemSet
-     *
-     * @return the metricItemSet
-     */
-    public DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
-    class SinkTask implements Runnable {
-
-        private void sendMessage(MessageProducer producer, Event event,
-                                 String topic, AtomicBoolean flag, EventStat es)
-                throws TubeClientException, InterruptedException {
-            if (msgDedupHandler.judgeDupAndPutMsgSeqId(
-                    event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
-                logger.info("{} agent package {} existed,just discard.",
-                        getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
-            } else {
-                Message message = new Message(topic, event.getBody());
-                message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-                String streamId = "";
-                if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-                    streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-                } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                    streamId = event.getHeaders().get(AttributeConstants.INAME);
-                }
-                message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-                producer.sendMessage(message, new MyCallback(es));
-                flag.set(true);
-            }
-            illegalTopicMap.remove(topic);
-        }
-
-        private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
-            if (t instanceof TubeClientException) {
-                String message = t.getMessage();
-                if (message != null && (message.contains("No available queue for topic")
-                        || message.contains("The brokers of topic are all forbidden"))) {
-                    illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
-                    logger.info("IllegalTopicMap.put " + topic);
-                    return;
-                } else {
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
-                        //ignore..
-                    }
-                }
-            }
-            logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name="
-                    + Thread.currentThread().getName()
-                    + ",event.headers=" + es.getEvent().getHeaders(), t);
+    private class TubeSinkTask implements Runnable {
+        public TubeSinkTask() {
+            // ignore
         }
 
         @Override
         public void run() {
+            Event event = null;
+            EventStat es = null;
+            String topic = null;
+            boolean sendFinished = false;
+            MessageProducer producer = null;
             logger.info("sink task {} started.", Thread.currentThread().getName());
             while (canSend) {
-                boolean decrementFlag = false;
-                boolean resendBadEvent = false;
-                Event event = null;
-                EventStat es = null;
-                String topic = null;
                 try {
-                    if (TubeSink.this.overflow) {
-                        TubeSink.this.overflow = false;
-                        Thread.sleep(10);
+                    if (isOverFlow) {
+                        isOverFlow = false;
+                        Thread.sleep(30);
                     }
+                    event = null;
+                    topic = null;
+                    // get event from queues
                     if (!resendQueue.isEmpty()) {
                         es = resendQueue.poll();
-                        if (es != null) {
-                            event = es.getEvent();
-                            // logger.warn("Resend event: {}", event.toString());
-                            if (event.getHeaders().containsKey(TOPIC)) {
-                                topic = event.getHeaders().get(TOPIC);
-                            }
-                            resendBadEvent = true;
+                        if (es == null) {
+                            continue;
+                        }
+                        resendMsgCnt.decrementAndGet();
+                        event = es.getEvent();
+                        if (event.getHeaders().containsKey(TOPIC)) {
+                            topic = event.getHeaders().get(TOPIC);
                         }
                     } else {
-                        event = eventQueue.take();
+                        event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
+                        if (event == null) {
+                            if (!canTake && takenMsgCnt.get() <= 0) {
+                                logger.info("Found canTake is false and taken message count is zero, braek!");
+                                break;
+                            }
+                            continue;
+                        }
+                        cachedMsgCnt.decrementAndGet();
+                        takenMsgCnt.incrementAndGet();
                         es = new EventStat(event);
-//                            sendCnt.incrementAndGet();
                         if (event.getHeaders().containsKey(TOPIC)) {
                             topic = event.getHeaders().get(TOPIC);
                         }
                     }
-
-                    if (event == null) {
-                        // ignore event is null, when multiple-thread SinkTask running
-                        // this null value comes from resendQueue
-                        continue;
-                    }
-
-                    if (topic == null || topic.equals("")) {
-                        logger.warn("no topic specified in event header, just skip this event");
-                        continue;
-                    }
-
-                    Long expireTime = illegalTopicMap.get(topic);
-                    if (expireTime != null) {
-                        long currentTime = System.currentTimeMillis();
-                        if (expireTime > currentTime) {
-                            // TODO: need to be improved.
-//                            reChannelEvent(es, topic);
-                            continue;
-                        } else {
-                            illegalTopicMap.remove(topic);
+                    // valid event status
+                    if (StringUtils.isBlank(topic)) {
+                        blankTopicDiscardMsgCnt.incrementAndGet();
+                        takenMsgCnt.decrementAndGet();
+                        if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+                            logger.error("No topic specified, just discard the event, event header is "
+                                    + event.getHeaders().toString());
                         }
-                    }
-                    MessageProducer producer = null;
-                    try {
-                        producer = getProducer(topic);
-                    } catch (Exception e) {
-                        logger.error("Get producer failed!", e);
-                    }
-
-                    if (producer == null) {
-                        illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
                         continue;
                     }
-
-                    AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
-                    sendMessage(producer, event, topic, flagAtomic, es);
-                    decrementFlag = flagAtomic.get();
+                    // send message
+                    sendFinished = sendMessage(es, event, topic);
                 } catch (InterruptedException e) {
                     logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                     return;
                 } catch (Throwable t) {
-                    handleException(t, topic, decrementFlag, es);
-                    resendEvent(es, decrementFlag);
+                    resendEvent(es, sendFinished);
+                    if (t instanceof TubeClientException) {
+                        String message = t.getMessage();
+                        if (message != null && (message.contains("No available queue for topic")
+                                || message.contains("The brokers of topic are all forbidden"))) {
+                            isOverFlow = true;
+                        }
+                    }
+                    if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+                        logger.error("Sink task fail to send the message, finished =" + sendFinished
+                                + ",sink.name=" + Thread.currentThread().getName()
+                                + ",event.headers=" + es.getEvent().getHeaders(), t);
+                    }
                 }
             }
+            logger.info("sink task {} stopped!", Thread.currentThread().getName());
+        }
+
+        private boolean sendMessage(EventStat es, Event event, String topic) throws Exception {
+            MessageProducer producer = producerHolder.getProducer(topic);
+            if (producer == null) {
+                frozenTopicDiscardMsgCnt.incrementAndGet();
+                takenMsgCnt.decrementAndGet();
+                if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+                    logger.error("Get producer failed for " + topic);
+                }
+                return false;
+            }
+            if (MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId(
+                    event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
+                dupDiscardMsgCnt.incrementAndGet();
+                takenMsgCnt.decrementAndGet();
+                logger.info("{} agent package {} existed,just discard.",
+                        Thread.currentThread().getName(),
+                        event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
+                return false;
+            } else {
+                producer.sendMessage(TubeUtils.buildMessage(
+                        topic, event, false), new MyCallback(es));
+                inflightMsgCnt.incrementAndGet();
+                return true;
+            }
         }
     }
 
-    public class MyCallback implements MessageSentCallback {
+    private class MyCallback implements MessageSentCallback {
 
         private EventStat myEventStat;
         private long sendTime;
@@ -677,7 +387,9 @@ public class TubeSink extends AbstractSink implements Configurable {
         @Override
         public void onMessageSent(final MessageSentResult result) {
             if (result.isSuccess()) {
-                // TODO: add stats
+                successMsgCnt.incrementAndGet();
+                inflightMsgCnt.decrementAndGet();
+                takenMsgCnt.decrementAndGet();
                 this.addMetric(myEventStat.getEvent(), true, sendTime);
             } else {
                 this.addMetric(myEventStat.getEvent(), false, 0);
@@ -685,10 +397,9 @@ public class TubeSink extends AbstractSink implements Configurable {
                     logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
                             result.getErrMsg(), resendQueue.size(),
                             myEventStat.getEvent().hashCode());
-
                     return;
-                }
-                if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
+                } else if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW
+                        && LOG_SINK_TASK_PRINTER.shouldPrint()) {
                     logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
                             result.getErrMsg(), resendQueue.size(),
                             myEventStat.getEvent().hashCode());
@@ -731,65 +442,177 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         @Override
         public void onException(final Throwable e) {
-            Throwable t = e;
-            while (t.getCause() != null) {
-                t = t.getCause();
-            }
-            if (t instanceof OverflowException) {
-                TubeSink.this.overflow = true;
-            }
             resendEvent(myEventStat, true);
         }
     }
 
-    class TopicProducerInfo {
-
-        private TubeMultiSessionFactory sessionFactory;
-        private MessageProducer producer;
-        private Set<String> topicSet;
+    private class TubeStatsTask implements Runnable {
 
-        public TopicProducerInfo(TubeMultiSessionFactory sessionFactory) {
-            this.sessionFactory = sessionFactory;
+        @Override
+        public void run() {
+            if (!canTake && takenMsgCnt.get() <= 0) {
+                return;
+            }
+            logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get()
+                    + ", takenMsgCnt=" + takenMsgCnt.get()
+                    + ", resendMsgCnt=" + resendMsgCnt.get()
+                    + ", blankTopicDiscardMsgCnt=" + blankTopicDiscardMsgCnt.get()
+                    + ", frozenTopicDiscardMsgCnt=" + frozenTopicDiscardMsgCnt.get()
+                    + ", dupDiscardMsgCnt=" + dupDiscardMsgCnt.get()
+                    + ", inflightMsgCnt=" + inflightMsgCnt.get()
+                    + ", successMsgCnt=" + successMsgCnt.get());
         }
+    }
 
-        public void shutdown() {
-            if (producer != null) {
-                try {
-                    producer.shutdown();
-                } catch (Throwable e) {
-                    logger.error("destroy producer error in tube sink", e);
+    /**
+     * resend event
+     */
+    private void resendEvent(EventStat es, boolean sendFinished) {
+        try {
+            if (sendFinished) {
+                inflightMsgCnt.decrementAndGet();
+            }
+            if (es == null || es.getEvent() == null) {
+                takenMsgCnt.decrementAndGet();
+                return;
+            }
+            MSG_DEDUP_HANDLER.invalidMsgSeqId(es.getEvent()
+                    .getHeaders().get(ConfigConstants.SEQUENCE_ID));
+            if (resendQueue.offer(es)) {
+                resendMsgCnt.incrementAndGet();
+            } else {
+                FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
+                takenMsgCnt.decrementAndGet();
+                if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+                    logger.error(Thread.currentThread().getName()
+                            + " Channel --> Tube --> ResendQueue(full) -->"
+                            + "FailOverChannelProcessor(current code point),"
+                            + " Resend queue is full,Check if Tube server or network is ok.");
                 }
             }
+        } catch (Throwable throwable) {
+            takenMsgCnt.decrementAndGet();
+            if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+                logger.error(getName() + " Discard msg because put events to both of queue and "
+                        + "fileChannel fail,current resendQueue.size = "
+                        + resendQueue.size(), throwable);
+            }
         }
+    }
 
-        public void initProducer() {
-            if (sessionFactory == null) {
-                logger.error("sessionFactory is null, can't create producer");
-                return;
+    private Map<String, String> getNewDimension(String otherKey, String value) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+        dimensions.put(otherKey, value);
+        return dimensions;
+    }
+
+    /**
+     * Differentiate unpublished topic sets and publish them
+     * attention: only append added topics
+     *
+     * @param curTopicSet   the current used topic set
+     * @param newTopicSet   the latest configured topic set
+     */
+    private void diffSetPublish(Set<String> curTopicSet, Set<String> newTopicSet) {
+        if (!this.started.get()) {
+            logger.info(getName() + " not started, ignore this change!");
+        }
+        if (SetUtils.isEqualSet(curTopicSet, newTopicSet)) {
+            return;
+        }
+        // filter unpublished topics
+        Set<String> addedTopics = new HashSet<>();
+        for (String topic : newTopicSet) {
+            if (StringUtils.isBlank(topic)) {
+                continue;
             }
-            try {
-                this.producer = sessionFactory.createProducer();
-            } catch (TubeClientException e) {
-                logger.error("create tube messageProducer error in tubesink, ex {}", e.getMessage());
+            if (!curTopicSet.contains(topic)) {
+                addedTopics.add(topic);
             }
         }
-
-        public Set<String> publishTopic(Set<String> topicSet) {
+        // publish them
+        if (!addedTopics.isEmpty()) {
             try {
-                this.topicSet = producer.publish(topicSet);
-            } catch (TubeClientException e) {
-                logger.info(getName() + " meta sink initTopicSet fail.", e);
+                producerHolder.createProducersByTopicSet(addedTopics);
+            } catch (Exception e) {
+                logger.info(getName() + "'s publish new topic set fail.", e);
             }
-            return this.topicSet;
+            logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
+                    addedTopics);
+            topicProperties = configManager.getTopicProperties();
         }
+    }
 
-        public MessageProducer getProducer() {
-            return producer;
+    /**
+     * When masterUrlLists change, update tubeClient
+     * Requirement: when switching the Master cluster,
+     * the DataProxy node must not do the data reporting service
+     *
+     * @param curClusterSet previous masterHostAndPortList set
+     * @param newClusterSet new masterHostAndPortList set
+     */
+    private void diffUpdateTubeClient(Set<String> curClusterSet,
+                                      Set<String> newClusterSet) {
+        if (!this.started.get()) {
+            logger.info(getName() + " not started, ignore this change!");
+        }
+        if (newClusterSet == null || newClusterSet.isEmpty()
+                || SetUtils.isEqualSet(curClusterSet, newClusterSet)
+                || newClusterSet.contains(usedMasterAddr)) {
+            return;
+        }
+        String newMasterAddr = getFirstClusterAddr(newClusterSet);
+        if (newMasterAddr == null) {
+            return;
+        }
+        TubeProducerHolder newProducerHolder = new TubeProducerHolder(getName(),
+                newMasterAddr, configManager.getMqClusterConfig());
+        try {
+            newProducerHolder.start(new HashSet<>(configManager.getTopicProperties().values()));
+        } catch (Throwable e) {
+            logger.error(getName() + " create new producer holder for " + newMasterAddr
+                            + " failure, throw exception is  {}", e.getMessage());
+            return;
         }
+        // replace current producer holder
+        final String tmpMasterAddr = usedMasterAddr;
+        TubeProducerHolder tmpProducerHolder = producerHolder;
+        producerHolder = newProducerHolder;
+        usedMasterAddr = newMasterAddr;
+        // close old producer holder
+        tmpProducerHolder.stop();
+        logger.info(getName() + " switch cluster from "
+                + tmpMasterAddr + " to " + usedMasterAddr);
+    }
 
-        public Set<String> getTopicSet() {
-            return this.topicSet;
+    /**
+     * Get first cluster address
+     *
+     * @param clusterSet  cluster set configure
+     * @return  the selected cluster address
+     *          null if set is empty or if items are all blank
+     */
+    private String getFirstClusterAddr(Set<String> clusterSet) {
+        String tmpMasterAddr = null;
+        for (String masterAddr : clusterSet) {
+            if (StringUtils.isBlank(masterAddr)) {
+                continue;
+            }
+            tmpMasterAddr = masterAddr;
+            break;
         }
+        return tmpMasterAddr;
+    }
+
+    /**
+     * get metricItemSet
+     *
+     * @return the metricItemSet
+     */
+    private DataProxyMetricItemSet getMetricItemSet() {
+        return metricItemSet;
     }
 
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
index 297fbc769..ba3dec596 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.common;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
 import com.google.common.cache.LoadingCache;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -95,4 +96,11 @@ public class MsgDedupHandler {
         }
         return "Disable for message data deduplication function";
     }
+
+    public CacheStats getCacheData() {
+        if (enableDataDedup) {
+            return msgSeqIdCache.stats();
+        }
+        return null;
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
new file mode 100644
index 000000000..c1852fed3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.common;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.FlumeException;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TubeProducerHolder {
+    private static final Logger logger =
+            LoggerFactory.getLogger(TubeProducerHolder.class);
+    private static final long SEND_FAILURE_WAIT = 30000L;
+    private static final long PUBLISH_FAILURE_WAIT = 60000L;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final String sinkName;
+    private final String clusterAddr;
+    private final MQClusterConfig clusterConfig;
+    private TubeMultiSessionFactory sessionFactory = null;
+    private final Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();
+    private MessageProducer lastProducer = null;
+    private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0);
+    private static final ConcurrentHashMap<String, AtomicLong> FROZEN_TOPIC_MAP
+            = new ConcurrentHashMap<>();
+
+    public TubeProducerHolder(String sinkName, String clusterAddr, MQClusterConfig tubeConfig) {
+        Preconditions.checkState(StringUtils.isNotBlank(clusterAddr),
+                "No TubeMQ's cluster address list specified");
+        this.sinkName = sinkName;
+        this.clusterAddr = clusterAddr;
+        this.clusterConfig = tubeConfig;
+    }
+
+    public void start(Set<String> configTopicSet) {
+        if (!this.started.compareAndSet(false, true)) {
+            logger.info("ProducerHolder for " + sinkName + " has started!");
+            return;
+        }
+        logger.info("ProducerHolder for " + sinkName + " begin to start!");
+        // create session factory
+        try {
+            TubeClientConfig clientConfig = TubeUtils.buildClientConfig(clusterAddr, this.clusterConfig);
+            this.sessionFactory = new TubeMultiSessionFactory(clientConfig);
+            createProducersByTopicSet(configTopicSet);
+        } catch (Throwable e) {
+            stop();
+            String errInfo = "Build session factory  to " + clusterAddr
+                    + " for " + sinkName + " failure, please re-check";
+            logger.error(errInfo, e);
+            throw new FlumeException(errInfo);
+        }
+        logger.info("ProducerHolder for " + sinkName + " started!");
+    }
+
+    public void stop() {
+        if (this.started.get()) {
+            return;
+        }
+        // change start flag
+        if (!this.started.compareAndSet(true, false)) {
+            logger.info("ProducerHolder for " + sinkName + " has stopped!");
+            return;
+        }
+        logger.info("ProducerHolder for " + sinkName + " begin to stop!");
+        for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
+            if (entry == null || entry.getValue() == null) {
+                continue;
+            }
+            try {
+                entry.getValue().shutdown();
+            } catch (Throwable e) {
+                // ignore log
+            }
+        }
+        producerMap.clear();
+        lastProducer = null;
+        lastPubTopicCnt.set(0);
+        FROZEN_TOPIC_MAP.clear();
+        if (sessionFactory != null) {
+            try {
+                sessionFactory.shutdown();
+            } catch (Throwable e) {
+                // ignore log
+            }
+            sessionFactory = null;
+        }
+        logger.info("ProducerHolder for " + sinkName + " finished stop!");
+    }
+
+    /**
+     * Get producer by topic name:
+     *   i. if the topic is judged to be an illegal topic, return null;
+     *   ii. if it is not an illegal topic or the status has expired, check:
+     *    a. if the topic has been published before, return the corresponding producer directly;
+     *    b. if the topic is not in the published list, perform the topic's publish action.
+     *  If the topic is thrown exception during the publishing process,
+     *     set the topic to an illegal topic
+     *
+     * @param topicName  the topic name
+     *
+     * @return  the producer
+     *          if topic is illegal, return null
+     * @throws  TubeClientException
+     */
+    public MessageProducer getProducer(String topicName) throws TubeClientException {
+        AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+        if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+            return null;
+        }
+        MessageProducer tmpProducer = producerMap.get(topicName);
+        if (tmpProducer != null) {
+            if (fbdTime != null) {
+                FROZEN_TOPIC_MAP.remove(topicName);
+            }
+            return tmpProducer;
+        }
+        synchronized (lastPubTopicCnt) {
+            fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+            if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+                return null;
+            }
+            if (lastProducer == null
+                    || lastPubTopicCnt.get() >= clusterConfig.getMaxTopicsEachProducerHold()) {
+                lastProducer = sessionFactory.createProducer();
+                lastPubTopicCnt.set(0);
+            }
+            try {
+                lastProducer.publish(topicName);
+            } catch (Throwable e) {
+                fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+                if (fbdTime == null) {
+                    AtomicLong tmpFbdTime = new AtomicLong();
+                    fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime);
+                    if (fbdTime == null) {
+                        fbdTime = tmpFbdTime;
+                    }
+                }
+                fbdTime.set(System.currentTimeMillis() + PUBLISH_FAILURE_WAIT);
+                logger.warn("Throw exception while publish topic="
+                        + topicName + ", exception is " + e.getMessage());
+                return null;
+            }
+            producerMap.put(topicName, lastProducer);
+            lastPubTopicCnt.incrementAndGet();
+            return lastProducer;
+        }
+    }
+
+    /**
+     * Whether frozen production according to the exceptions returned by message sending
+     *
+     * @param topicName  the topic name sent message
+     * @param throwable  the exception information thrown when sending a message
+     *
+     * @return  whether illegal topic
+     */
+    public boolean needFrozenSent(String topicName, Throwable throwable) {
+        if (throwable instanceof TubeClientException) {
+            String message = throwable.getMessage();
+            if (message != null && (message.contains("No available partition for topic")
+                    || message.contains("The brokers of topic are all forbidden"))) {
+                AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+                if (fbdTime == null) {
+                    AtomicLong tmpFbdTime = new AtomicLong(0);
+                    fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime);
+                    if (fbdTime == null) {
+                        fbdTime = tmpFbdTime;
+                    }
+                }
+                fbdTime.set(System.currentTimeMillis() + SEND_FAILURE_WAIT);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Create sink producers by configured topic set
+     * group topicSet to different group, each group is associated with a producer
+     *
+     * @param cfgTopicSet  the configured topic set
+     */
+    public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
+        if (cfgTopicSet == null || cfgTopicSet.isEmpty()) {
+            return;
+        }
+        // filter published topics
+        List<String> filteredTopics = new ArrayList<>(cfgTopicSet.size());
+        for (String topicName : cfgTopicSet) {
+            if (StringUtils.isBlank(topicName)
+                    || producerMap.get(topicName) != null) {
+                continue;
+            }
+            filteredTopics.add(topicName);
+        }
+        if (filteredTopics.isEmpty()) {
+            return;
+        }
+        // alloc topic count
+        Collections.sort(filteredTopics);
+        long startTime = System.currentTimeMillis();
+        int maxPublishTopicCnt = clusterConfig.getMaxTopicsEachProducerHold();
+        int allocTotalCnt = filteredTopics.size();
+        List<Integer> topicGroupCnt = new ArrayList<>();
+        int paddingCnt = (lastPubTopicCnt.get() <= 0)
+                ? 0 : (maxPublishTopicCnt - lastPubTopicCnt.get());
+        while (allocTotalCnt > 0) {
+            if (paddingCnt > 0) {
+                topicGroupCnt.add(Math.min(allocTotalCnt, paddingCnt));
+                allocTotalCnt -= paddingCnt;
+                paddingCnt = 0;
+            } else {
+                topicGroupCnt.add(Math.min(allocTotalCnt, maxPublishTopicCnt));
+                allocTotalCnt -= maxPublishTopicCnt;
+            }
+        }
+        // create producer
+        int startPos = 0;
+        int endPos = 0;
+        Set<String> subTopicSet = new HashSet<>();
+        for (Integer dltCnt : topicGroupCnt) {
+            // allocate topic items
+            subTopicSet.clear();
+            endPos = startPos + dltCnt;
+            for (int index = startPos; index < endPos; index++) {
+                subTopicSet.add(filteredTopics.get(index));
+            }
+            startPos = endPos;
+            // create producer
+            if (lastProducer == null
+                    || lastPubTopicCnt.get() == maxPublishTopicCnt) {
+                lastProducer = sessionFactory.createProducer();
+                lastPubTopicCnt.set(0);
+            }
+            lastProducer.publish(subTopicSet);
+            lastPubTopicCnt.addAndGet(subTopicSet.size());
+            for (String topicItem : subTopicSet) {
+                producerMap.put(topicItem, lastProducer);
+            }
+        }
+        logger.info(sinkName + " initializes producers for topics:"
+                + producerMap.keySet() + ", cost: " + (System.currentTimeMillis() - startTime)
+                + "ms");
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
new file mode 100644
index 000000000..6a67978f8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.common;
+
+import java.util.Map;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.NetworkUtils;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.corebase.Message;
+
+public class TubeUtils {
+
+    /**
+     * Build TubeMQ's client configure
+     *
+     * @param clusterAddr    the TubeMQ cluster address
+     * @param tubeConfig     the TubeMQ cluster configure
+     * @return   the TubeClientConfig object
+     */
+    public static TubeClientConfig buildClientConfig(String clusterAddr, MQClusterConfig tubeConfig) {
+        final TubeClientConfig tubeClientConfig = new TubeClientConfig(clusterAddr);
+        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount());
+        tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount());
+        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount());
+        tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark());
+        tubeClientConfig.setHeartbeatPeriodMs(tubeConfig.getTubeHeartbeatPeriodMs());
+        tubeClientConfig.setRpcTimeoutMs(tubeConfig.getTubeRpcTimeoutMs());
+        return tubeClientConfig;
+    }
+
+    /**
+     * Build TubeMQ's message
+     *
+     * @param topicName      the topic name of message
+     * @param event          the DataProxy event
+     * @param addExtraAttrs  whether to add extra attributes
+     * @return   the message object
+     */
+    public static Message buildMessage(String topicName,
+                                       Event event, boolean addExtraAttrs) {
+        Message message = new Message(topicName, event.getBody());
+        message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
+        String streamId = "";
+        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
+            streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
+            streamId = event.getHeaders().get(AttributeConstants.INAME);
+        }
+        message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+        if (addExtraAttrs) {
+            // common attributes
+            Map<String, String> headers = event.getHeaders();
+            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
+            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
+            message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
+            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
+            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+        }
+        return message;
+    }
+}