You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/10 11:22:12 UTC

[inlong] 02/03: [INLONG-7191][DataProxy] Remove unused code (#7192)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9c9243d284437b4526c603194c54bfe9292edd51
Author: woofyzhao <zh...@gmail.com>
AuthorDate: Tue Jan 10 15:58:32 2023 +0800

    [INLONG-7191][DataProxy] Remove unused code (#7192)
---
 .../apache/inlong/dataproxy/sink/EventStat.java    |  81 ---
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 725 ---------------------
 .../dataproxy/sink/SimpleMessageTubeSink.java      | 714 --------------------
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 707 --------------------
 .../dataproxy/sink/mqzone/AbstactZoneWorker.java   | 108 ---
 .../sink/mqzone/AbstractZoneClusterProducer.java   | 115 ----
 .../sink/mqzone/AbstractZoneProducer.java          | 159 -----
 .../dataproxy/sink/mqzone/AbstractZoneSink.java    | 179 -----
 .../sink/mqzone/AbstractZoneSinkContext.java       | 405 ------------
 .../sink/mqzone/ZoneClusterProducerCalculator.java |  27 -
 .../sink/mqzone/ZoneWorkerCalculator.java          |  23 -
 .../impl/kafkazone/KafkaClusterProducer.java       | 148 -----
 .../mqzone/impl/kafkazone/KafkaZoneProducer.java   |  50 --
 .../sink/mqzone/impl/kafkazone/KafkaZoneSink.java  |  45 --
 .../impl/kafkazone/KafkaZoneSinkContext.java       |  44 --
 .../mqzone/impl/kafkazone/KafkaZoneWorker.java     |  52 --
 .../impl/pulsarzone/PulsarClusterProducer.java     | 278 --------
 .../mqzone/impl/pulsarzone/PulsarZoneProducer.java |  45 --
 .../mqzone/impl/pulsarzone/PulsarZoneSink.java     |  45 --
 .../impl/pulsarzone/PulsarZoneSinkContext.java     |  44 --
 .../mqzone/impl/pulsarzone/PulsarZoneWorker.java   |  52 --
 .../mqzone/impl/tubezone/TubeClusterProducer.java  | 205 ------
 .../mqzone/impl/tubezone/TubeZoneProducer.java     |  51 --
 .../sink/mqzone/impl/tubezone/TubeZoneSink.java    |  46 --
 .../mqzone/impl/tubezone/TubeZoneSinkContext.java  |  44 --
 .../sink/mqzone/impl/tubezone/TubeZoneWorker.java  |  51 --
 .../sink/pulsar/CreatePulsarClientCallBack.java    |  25 -
 .../dataproxy/sink/pulsar/PulsarClientService.java | 563 ----------------
 .../dataproxy/sink/pulsar/SendMessageCallBack.java |  33 -
 .../inlong/dataproxy/sink/pulsar/SinkTask.java     | 230 -------
 .../pulsar/federation/PulsarFederationSink.java    | 140 ----
 .../federation/PulsarFederationSinkContext.java    | 198 ------
 .../pulsar/federation/PulsarFederationWorker.java  | 148 -----
 .../pulsar/federation/PulsarProducerCluster.java   | 319 ---------
 .../federation/PulsarProducerFederation.java       | 163 -----
 .../inlong/dataproxy/sink/TestPulsarSink.java      |  56 --
 .../apache/inlong/dataproxy/sink/TestTubeSink.java |  57 --
 .../federation/TestPulsarFederationSink.java       |  87 ---
 .../federation/TestPulsarProducerFederation.java   |  95 ---
 39 files changed, 6557 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java
deleted file mode 100644
index 3bf635e15..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-
-public class EventStat {
-
-    private static long RETRY_INTERVAL_MS = 1 * 1000L;
-    private Event event;
-    private AtomicInteger myRetryCnt = new AtomicInteger(0);
-    private boolean isOrderMessage = false;
-    private boolean isSinkRspType = false;
-
-    public EventStat(Event event) {
-        this.event = event;
-        this.isSinkRspType = MessageUtils.isSinkRspType(event);
-        this.isOrderMessage = MessageUtils.isSyncSendForOrder(event);
-    }
-
-    public EventStat(Event event, int retryCnt) {
-        this.event = event;
-        this.myRetryCnt.set(retryCnt);
-        this.isSinkRspType = MessageUtils.isSinkRspType(event);
-        this.isOrderMessage = MessageUtils.isSyncSendForOrder(event);
-    }
-
-    public Event getEvent() {
-        return event;
-    }
-
-    public void setEvent(Event event) {
-        this.event = event;
-    }
-
-    public int getRetryCnt() {
-        return myRetryCnt.get();
-    }
-
-    public void setRetryCnt(int retryCnt) {
-        this.myRetryCnt.set(retryCnt);
-    }
-
-    public void incRetryCnt() {
-        this.myRetryCnt.incrementAndGet();
-    }
-
-    public boolean isOrderMessage() {
-        return isOrderMessage;
-    }
-
-    public boolean isSinkRspType() {
-        return isSinkRspType;
-    }
-
-    public boolean shouldDrop() {
-        return false;
-    }
-
-    public void reset() {
-        this.event = null;
-        this.myRetryCnt.set(0);
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
deleted file mode 100644
index 25cf65417..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.RateLimiter;
-import javax.annotation.Nonnull;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import io.netty.handler.codec.TooLongFrameException;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
-import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
-import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
-import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
-import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
-import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
-import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Use pulsarSink need adding such config, if these ara not config in dataproxy-pulsar.conf,
- * PulsarSink will use default value.
- * <p/>
- *
- * Prefix of Pulsar sink config in flume.conf like this XXX.sinks.XXX.property and properties are may these
- * configurations:
- * <p/>
- * <code>type</code> (*): value must be 'org.apache.inlong.dataproxy.sink.PulsarSink'
- * <p/>
- * <code>pulsar_server_url_list</code> (*): value is pulsar broker url, like 'pulsar://127.0.0.1:6650'
- * <p/>
- * <code>send_timeout_MILL</code>: send message timeout, unit is millisecond, default is 30000 (mean 30s)
- * <p/>
- * <code>stat_interval_sec</code>: stat info will be made period time, unit is second, default is 60s
- * <p/>
- * <code>thread-num</code>: sink thread num, default is 8
- * <p/>
- * <code>client-id-cache</code>: whether the client uses cache, default is true
- * <p/>
- * <code>max_pending_messages</code>: default is 10000
- * <p/>
- * <code>max_batching_messages</code>: default is 1000
- * <p/>
- * <code>enable_batch</code>: default is true
- * <p/>
- * <code>block_if_queue_full</code>: default is true
- */
-public class PulsarSink extends AbstractSink implements Configurable, SendMessageCallBack, CreatePulsarClientCallBack {
-
-    private static final Logger logger = LoggerFactory.getLogger(PulsarSink.class);
-    /**
-     * log tools
-     */
-    private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
-    private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
-    private static final String SEPARATOR = "#";
-    private static final Long PRINT_INTERVAL = 30L;
-
-    private static final PulsarPerformanceTask PULSAR_PERFORMANCE_TASK = new PulsarPerformanceTask();
-    private static final LoadingCache<String, Long> AGENT_ID_CACHE = CacheBuilder.newBuilder()
-            .concurrencyLevel(4 * 8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
-            .build(new CacheLoader<String, Long>() {
-
-                @Nonnull
-                @Override
-                public Long load(@Nonnull String key) {
-                    return System.currentTimeMillis();
-                }
-            });
-    /*
-     * for stat
-     */
-    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_CNT = new AtomicLong(0);
-    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_SIZE = new AtomicLong(0);
-    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
-            .newScheduledThreadPool(1, new HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
-
-    static {
-        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(PULSAR_PERFORMANCE_TASK, 0L,
-                PRINT_INTERVAL, TimeUnit.SECONDS);
-        logger.info("success to start pulsar performance task");
-    }
-
-    private final AtomicLong currentInFlightCount = new AtomicLong(0);
-    private final AtomicLong currentSuccessSendCnt = new AtomicLong(0);
-    private final AtomicLong lastSuccessSendCnt = new AtomicLong(0);
-    private final AtomicInteger processIndex = new AtomicInteger(0);
-
-    private RateLimiter diskRateLimiter;
-    private long t1 = System.currentTimeMillis();
-    private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
-    /*
-     * Control whether the SinkRunner thread can read data from the Channel
-     */
-    private volatile boolean canTake = false;
-    private SinkCounter sinkCounter;
-    /*
-     * message queue and retry
-     */
-    private int eventQueueSize = 10000;
-    private int badEventQueueSize = 10000;
-    private int maxRetrySendCnt = 16;
-    /*
-     * send thread pool
-     */
-    private SinkTask[] sinkThreadPool;
-    private int sinkThreadPoolSize;
-    private PulsarClientService pulsarClientService;
-    /*
-     * statistic info log
-     */
-    private MonitorIndex monitorIndex;
-    private MonitorIndexExt monitorIndexExt;
-
-    /*
-     * metric
-     */
-    private DataProxyMetricItemSet metricItemSet;
-    private ConfigManager configManager;
-    private Map<String, String> topicProperties;
-    private Map<String, String> pulsarCluster;
-    private MQClusterConfig pulsarConfig;
-
-    public PulsarSink() {
-        super();
-        logger.debug("new instance of PulsarSink!");
-    }
-
-    /**
-     * configure
-     */
-    @Override
-    public void configure(Context context) {
-        logger.info("PulsarSink started and context = {}", context.toString());
-        // get maxMonitorCnt's configure value
-        try {
-            maxMonitorCnt = context.getInteger(
-                    ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT);
-        } catch (NumberFormatException e) {
-            logger.warn("Property {} must specify an integer value: {}",
-                    ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT));
-        }
-        Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0");
-        configManager = ConfigManager.getInstance();
-        topicProperties = configManager.getTopicProperties();
-        pulsarCluster = configManager.getMqClusterUrl2Token();
-        pulsarConfig = configManager.getMqClusterConfig(); // pulsar common config
-        sinkThreadPoolSize = pulsarConfig.getThreadNum();
-        if (sinkThreadPoolSize <= 0) {
-            sinkThreadPoolSize = 1;
-        }
-        pulsarClientService = new PulsarClientService(pulsarConfig, sinkThreadPoolSize);
-
-        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
-            @Override
-            public void update() {
-                if (pulsarClientService != null) {
-                    diffSetPublish(pulsarClientService, new HashSet<>(topicProperties.values()),
-                            new HashSet<>(configManager.getTopicProperties().values()));
-                }
-            }
-        });
-        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
-
-            @Override
-            public void update() {
-                if (pulsarClientService != null) {
-                    diffUpdatePulsarClient(pulsarClientService, pulsarCluster, configManager.getMqClusterUrl2Token());
-                }
-            }
-        });
-        maxRetrySendCnt = pulsarConfig.getMaxRetryCnt();
-        badEventQueueSize = pulsarConfig.getBadEventQueueSize();
-        Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
-        sinkThreadPool = new SinkTask[sinkThreadPoolSize];
-        eventQueueSize = pulsarConfig.getEventQueueSize();
-        if (pulsarConfig.getDiskIoRatePerSec() != 0) {
-            diskRateLimiter = RateLimiter.create(pulsarConfig.getDiskIoRatePerSec());
-        }
-
-        if (sinkCounter == null) {
-            sinkCounter = new SinkCounter(getName());
-        }
-    }
-
-    private void initTopicSet(PulsarClientService pulsarClientService, Set<String> topicSet) {
-        long startTime = System.currentTimeMillis();
-        if (topicSet != null) {
-            for (String topic : topicSet) {
-                pulsarClientService.initTopicProducer(topic);
-            }
-        }
-        logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
-        logger.info(getName() + " producer is ready for topics: " + pulsarClientService.getProducerInfoMap().keySet());
-    }
-
-    /**
-     * When topic.properties is re-enabled, the producer update is triggered
-     */
-    public void diffSetPublish(PulsarClientService pulsarClientService,
-            Set<String> curTopicSet, Set<String> newTopicSet) {
-        boolean changed = false;
-        // create producers for new topics
-        for (String newTopic : newTopicSet) {
-            if (!curTopicSet.contains(newTopic)) {
-                changed = true;
-                try {
-                    pulsarClientService.initTopicProducer(newTopic);
-                } catch (Exception e) {
-                    logger.error("get producer failed: ", e);
-                }
-            }
-        }
-        // remove producers for deleted topics
-        for (String oldTopic : curTopicSet) {
-            if (!newTopicSet.contains(oldTopic)) {
-                changed = true;
-                try {
-                    pulsarClientService.destroyProducerByTopic(oldTopic);
-                } catch (Exception e) {
-                    logger.error("remove producer failed: ", e);
-                }
-            }
-        }
-        if (changed) {
-            topicProperties = configManager.getTopicProperties();
-            logger.info("topics.properties has changed, trigger diff publish for {},"
-                    + " old topic set = {}, new topic set = {}, current topicProperties = {}",
-                    getName(), curTopicSet, newTopicSet, topicProperties);
-        }
-    }
-
-    /**
-     * When pulsarURLList change, close and restart
-     */
-    public void diffUpdatePulsarClient(PulsarClientService pulsarClientService, Map<String, String> originalCluster,
-            Map<String, String> endCluster) {
-        MapDifference<String, String> mapDifference = Maps.difference(originalCluster, endCluster);
-        if (mapDifference.areEqual()) {
-            return;
-        }
-
-        logger.info("pulsarConfig has changed, close unused url clients and start new url clients");
-        Map<String, String> needToClose = new HashMap<>(mapDifference.entriesOnlyOnLeft());
-        Map<String, String> needToStart = new HashMap<>(mapDifference.entriesOnlyOnRight());
-        Map<String, MapDifference.ValueDifference<String>> differentToken = mapDifference.entriesDiffering();
-        for (String url : differentToken.keySet()) {
-            needToClose.put(url, originalCluster.get(url));
-            needToStart.put(url, endCluster.get(url));// token changed
-        }
-
-        pulsarClientService.updatePulsarClients(this, needToClose, needToStart,
-                new HashSet<>(topicProperties.values()));
-
-        pulsarCluster = configManager.getMqClusterUrl2Token();
-        if (!ConfigManager.getInstance().isMqClusterReady()) {
-            ConfigManager.getInstance().updMqClusterStatus(true);
-            logger.info("[{}] MQ Cluster service status ready!", getName());
-        }
-    }
-
-    @Override
-    public void start() {
-        logger.info("[{}] pulsar sink starting...", getName());
-        sinkCounter.start();
-        pulsarClientService.initCreateConnection(this, getName());
-
-        int statIntervalSec = pulsarConfig.getStatIntervalSec();
-        Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
-        if (statIntervalSec > 0) {
-            // switch for lots of metrics
-            monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
-            monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + this.getName(),
-                    statIntervalSec, maxMonitorCnt);
-        }
-
-        super.start();
-
-        try {
-            initTopicSet(pulsarClientService, new HashSet<String>(topicProperties.values()));
-        } catch (Exception e) {
-            logger.info("pulsar sink start publish topic fail.", e);
-        }
-
-        for (int i = 0; i < sinkThreadPoolSize; i++) {
-            sinkThreadPool[i] = new SinkTask(pulsarClientService, this,
-                    eventQueueSize / sinkThreadPoolSize,
-                    badEventQueueSize / sinkThreadPoolSize, i, true);
-            sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
-            sinkThreadPool[i].start();
-        }
-        // register metricItemSet
-        ConfigManager configManager = ConfigManager.getInstance();
-        String clusterId =
-                configManager.getCommonProperties().getOrDefault(
-                        ConfigConstants.PROXY_CLUSTER_NAME,
-                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
-        this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
-        MetricRegister.register(metricItemSet);
-        this.canTake = true;
-        logger.info("[{}] Pulsar sink started", getName());
-    }
-
-    @Override
-    public void stop() {
-        logger.info("pulsar sink stopping");
-        this.canTake = false;
-        int waitCount = 0;
-        while (isAllSendFinished() && waitCount++ < 10) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                logger.info("Stop thread has been interrupt!");
-                break;
-            }
-        }
-        if (pulsarConfig.getStatIntervalSec() > 0) {
-            try {
-                monitorIndex.shutDown();
-            } catch (Exception e) {
-                logger.warn("stat runner interrupted");
-            }
-        }
-        if (pulsarClientService != null) {
-            pulsarClientService.close();
-        }
-        if (sinkThreadPool != null) {
-            for (SinkTask thread : sinkThreadPool) {
-                if (thread != null) {
-                    thread.close();
-                    thread.interrupt();
-                }
-            }
-            sinkThreadPool = null;
-        }
-
-        super.stop();
-        if (!SCHEDULED_EXECUTOR_SERVICE.isShutdown()) {
-            SCHEDULED_EXECUTOR_SERVICE.shutdown();
-        }
-        sinkCounter.stop();
-        logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
-    }
-
-    private boolean isAllSendFinished() {
-        for (int i = 0; i < sinkThreadPoolSize; i++) {
-            if (!sinkThreadPool[i].isAllSendFinished()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public Status process() throws EventDeliveryException {
-        if (!this.canTake) {
-            return Status.BACKOFF;
-        }
-
-        Status status = Status.READY;
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event != null) {
-                if (diskRateLimiter != null) {
-                    diskRateLimiter.acquire(event.getBody().length);
-                }
-                if (!processEvent(new EventStat(event))) {
-                    logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
-                            + "--> pulsar,Check if pulsar server or network is ok.(if this situation "
-                            + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
-                    tx.rollback();
-                } else {
-                    tx.commit();
-                }
-            } else {
-                status = Status.BACKOFF;
-                tx.commit();
-            }
-        } catch (Throwable t) {
-            logger.error("Process event failed!" + this.getName(), t);
-            try {
-                tx.rollback();
-            } catch (Throwable e) {
-                logger.error("pulsar sink transaction rollback exception", e);
-
-            }
-        } finally {
-            tx.close();
-        }
-        return status;
-    }
-
-    @Override
-    public void handleCreateClientSuccess(String url) {
-        logger.info("createConnection success for url = {}", url);
-        sinkCounter.incrementConnectionCreatedCount();
-    }
-
-    @Override
-    public void handleCreateClientException(String url) {
-        logger.info("createConnection has exception for url = {}", url);
-        sinkCounter.incrementConnectionFailedCount();
-    }
-
-    @Override
-    public void handleMessageSendSuccess(String topic, Object result,
-            EventStat eventStat, long startTime) {
-        /*
-         * Statistics pulsar performance
-         */
-        TOTAL_PULSAR_SUCC_SEND_CNT.incrementAndGet();
-        TOTAL_PULSAR_SUCC_SEND_SIZE.addAndGet(eventStat.getEvent().getBody().length);
-        /*
-         * add to sinkCounter
-         */
-        sinkCounter.incrementEventDrainSuccessCount();
-        currentInFlightCount.decrementAndGet();
-        currentSuccessSendCnt.incrementAndGet();
-        long nowCnt = currentSuccessSendCnt.get();
-        long oldCnt = lastSuccessSendCnt.get();
-        long logEveryNEvents = pulsarConfig.getLogEveryNEvents();
-        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
-
-        if (nowCnt % logEveryNEvents == 0 && nowCnt != lastSuccessSendCnt.get()) {
-            lastSuccessSendCnt.set(nowCnt);
-            long t2 = System.currentTimeMillis();
-            logger.info("Pulsar sink {}, succ put {} events to pulsar in the past {} millsec",
-                    getName(), (nowCnt - oldCnt), (t2 - t1));
-            t1 = t2;
-        }
-        addStatistics(eventStat, true, startTime);
-        if (eventStat.isSinkRspType()) {
-            MessageUtils.sinkReturnRspPackage(
-                    (SinkRspEvent) eventStat.getEvent(), DataProxyErrCode.SUCCESS, "");
-        }
-    }
-
-    @Override
-    public void handleMessageSendException(String topic, EventStat eventStat,
-            Object e, DataProxyErrCode errCode, String errMsg) {
-        // decrease inflight count
-        currentInFlightCount.decrementAndGet();
-        // check whether retry send message
-        boolean needRetry = true;
-        if (e instanceof NotFoundException) {
-            logger.error("NotFoundException for topic " + topic + ", message will be discard!", e);
-            needRetry = false;
-        } else if (e instanceof TooLongFrameException) {
-            logger.error("TooLongFrameException, send failed for " + getName(), e);
-        } else if (e instanceof ProducerQueueIsFullError) {
-            logger.error("ProducerQueueIsFullError, send failed for " + getName(), e);
-        } else if (!(e instanceof AlreadyClosedException
-                || e instanceof PulsarClientException.NotConnectedException
-                || e instanceof TopicTerminatedException)) {
-            if (logPrinterB.shouldPrint()) {
-                logger.error("send failed for " + getName(), e);
-            }
-        }
-        addStatistics(eventStat, false, 0);
-        if (eventStat.isSinkRspType()) {
-            MessageUtils.sinkReturnRspPackage(
-                    (SinkRspEvent) eventStat.getEvent(), errCode, errMsg);
-        } else {
-            eventStat.incRetryCnt();
-            if (needRetry) {
-                processResendEvent(eventStat);
-            }
-        }
-    }
-
-    @Override
-    public void handleRequestProcError(String topic, EventStat eventStat, boolean needRetry,
-            DataProxyErrCode errCode, String errMsg) {
-        if (logPrinterB.shouldPrint()) {
-            logger.error(errMsg);
-        }
-        addStatistics(eventStat, false, 0);
-        if (MessageUtils.isSinkRspType(eventStat.getEvent())) {
-            MessageUtils.sinkReturnRspPackage(
-                    (SinkRspEvent) eventStat.getEvent(), errCode, errMsg);
-        } else {
-            eventStat.incRetryCnt();
-            if (needRetry) {
-                processResendEvent(eventStat);
-            }
-        }
-    }
-
-    /**
-     * Add statistics information
-     *
-     * @param eventStat   the statistic event
-     * @param isSuccess  is processed successfully
-     * @param sendTime   the send time when success processed
-     */
-    private void addStatistics(EventStat eventStat, boolean isSuccess, long sendTime) {
-        if (eventStat == null || eventStat.getEvent() == null) {
-            return;
-        }
-        Event event = eventStat.getEvent();
-        // add jmx metric items;
-        PulsarSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
-                event, sendTime, isSuccess, event.getBody().length);
-        if (isSuccess) {
-            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-        }
-        if (pulsarConfig.getStatIntervalSec() <= 0) {
-            return;
-        }
-        // add monitor items base file storage
-        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
-        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-        int intMsgCnt = Integer.parseInt(
-                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-        long dataTimeL = Long.parseLong(
-                event.getHeaders().get(AttributeConstants.DATA_TIME));
-        Pair<Boolean, String> evenProcType =
-                MessageUtils.getEventProcType(event);
-        // build statistic key
-        StringBuilder newBase = new StringBuilder(512)
-                .append(getName()).append(SEP_HASHTAG).append(topic)
-                .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
-                .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
-                .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
-                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
-        // count data
-        if (isSuccess) {
-            monitorIndex.addAndGet(newBase.toString(),
-                    intMsgCnt, 1, event.getBody().length, 0);
-            monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
-        } else {
-            monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
-            monitorIndex.addAndGet(newBase.toString(),
-                    0, 0, 0, intMsgCnt);
-        }
-    }
-
-    private boolean processEvent(EventStat eventStat) {
-        if (eventStat == null || eventStat.getEvent() == null) {
-            return true;
-        }
-
-        boolean result = true;
-        Event event = eventStat.getEvent();
-        if (eventStat.isOrderMessage()) {
-            String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
-            SinkTask sinkTask =
-                    sinkThreadPool[Math.abs(partitionKey.hashCode()) % sinkThreadPoolSize];
-            result = sinkTask.processEvent(eventStat);
-        } else {
-            int num = 0;
-            do {
-                int index = processIndex.getAndIncrement();
-                SinkTask sinkTask = sinkThreadPool[index % sinkThreadPoolSize];
-                if (sinkTask != null) {
-                    result = sinkTask.processEvent(eventStat);
-                    if (result) {
-                        break;
-                    }
-                }
-                num++;
-            } while (num < sinkThreadPoolSize);
-        }
-        return result;
-    }
-
-    private void processResendEvent(EventStat eventStat) {
-        try {
-            if (eventStat == null || eventStat.getEvent() == null) {
-                logger.warn("processResendEvent eventStat is null!");
-                return;
-            }
-            /*
-             * If the failure requires retransmission to pulsar, the sid needs to be removed before retransmission.
-             */
-            if (pulsarConfig.getClientIdCache()) {
-                String clientId = eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
-                if (clientId != null && AGENT_ID_CACHE.asMap().containsKey(clientId)) {
-                    AGENT_ID_CACHE.invalidate(clientId);
-                }
-            }
-            boolean result = false;
-            int num = 0;
-            do {
-                int index = processIndex.getAndIncrement();
-                SinkTask sinkTask = sinkThreadPool[index % sinkThreadPoolSize];
-                if (sinkTask != null) {
-                    result = sinkTask.processReSendEvent(eventStat);
-                    if (result) {
-                        break;
-                    }
-                }
-                num++;
-            } while (num < sinkThreadPoolSize);
-            if (!result) {
-                FailoverChannelProcessorHolder.getChannelProcessor()
-                        .processEvent(eventStat.getEvent());
-                if (logPrinterC.shouldPrint()) {
-                    logger.error(getName() + " Channel --> pulsar --> ResendQueue(full) "
-                            + "-->FailOverChannelProcessor(current code point), "
-                            + "Resend queue is full,Check if pulsar server or network is ok.");
-                }
-            }
-        } catch (Throwable throwable) {
-            if (pulsarConfig.getStatIntervalSec() > 0) {
-                monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
-            }
-            if (logPrinterC.shouldPrint()) {
-                logger.error(getName() + " Discard msg because put events to both of "
-                        + "queue and fileChannel fail", throwable);
-            }
-        }
-    }
-
-    public LoadingCache<String, Long> getAgentIdCache() {
-        return AGENT_ID_CACHE;
-    }
-
-    public Map<String, String> getTopicsProperties() {
-        return topicProperties;
-    }
-
-    public SinkCounter getSinkCounter() {
-        return sinkCounter;
-    }
-
-    public AtomicLong getCurrentInFlightCount() {
-        return currentInFlightCount;
-    }
-
-    public MQClusterConfig getPulsarConfig() {
-        return pulsarConfig;
-    }
-
-    public int getMaxRetrySendCnt() {
-        return maxRetrySendCnt;
-    }
-
-    static class PulsarPerformanceTask implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                if (TOTAL_PULSAR_SUCC_SEND_SIZE.get() != 0) {
-                    logger.info("Total pulsar performance tps :"
-                            + TOTAL_PULSAR_SUCC_SEND_CNT.get() / PRINT_INTERVAL
-                            + "/s, avg msg size:"
-                            + TOTAL_PULSAR_SUCC_SEND_SIZE.get() / TOTAL_PULSAR_SUCC_SEND_CNT.get()
-                            + ",print every " + PRINT_INTERVAL + " seconds");
-                    // totalPulsarSuccSendCnt represents the number of packets
-                    TOTAL_PULSAR_SUCC_SEND_CNT.set(0);
-                    TOTAL_PULSAR_SUCC_SEND_SIZE.set(0);
-                }
-
-            } catch (Exception e) {
-                logger.info("pulsarPerformanceTask error", e);
-            }
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
deleted file mode 100644
index f8cbece8b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ /dev/null
@@ -1,714 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.flume.source.shaded.guava.RateLimiter;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SimpleMessageTubeSink extends AbstractSink implements Configurable {
-
-    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageTubeSink.class);
-    private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
-    private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
-    private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
-
-    private static int BAD_EVENT_QUEUE_SIZE = 10000;
-
-    private static final String SINK_THREAD_NUM = "thread-num";
-    private static int EVENT_QUEUE_SIZE = 1000;
-    private volatile boolean canTake = false;
-    private volatile boolean canSend = false;
-    private static int BATCH_SIZE = 10000;
-    private static final int defaultRetryCnt = -1;
-    private static final int defaultLogEveryNEvents = 100000;
-    private static final int defaultSendTimeout = 20000; // in millsec
-    private static final int defaultStatIntervalSec = 60;
-    private static final int sendNewMetricRetryCount = 3;
-
-    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
-    private static String TOPIC = "topic";
-    private static String SEND_TIMEOUT = "send_timeout"; // in millsec
-    private static String LOG_EVERY_N_EVENTS = "log-every-n-events";
-    private static String RETRY_CNT = "retry-currentSuccSendedCnt";
-    private static String STAT_INTERVAL_SEC = "stat-interval-sec"; // in sec
-    private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
-
-    private static final String LOG_TOPIC = "proxy-log-topic";
-    private static final String STREAMID = "proxy-log-streamid";
-    private static final String GROUPID = "proxy-log-groupid";
-    private static final String SEND_REMOTE = "send-remote";
-    private static final String topicsFilePath = "topics.properties";
-    private static final String slaTopicFilePath = "slaTopics.properties";
-    private static final String SLA_METRIC_SINK = "sla-metric-sink";
-
-    private static String MAX_SURVIVED_TIME = "max-survived-time";
-    private static String MAX_SURVIVED_SIZE = "max-survived-size";
-    private static String CLIENT_ID_CACHE = "client-id-cache";
-
-    private String proxyLogTopic = "teg_manager";
-    private String proxyLogGroupId = "b_teg_manager";
-    private String proxyLogStreamId = "proxy_measure_log";
-    private boolean sendRemote = false;
-    private ConfigManager configManager;
-    private Map<String, String> topicProperties;
-
-    public MessageProducer producer;
-    public Map<String, MessageProducer> producerMap;
-
-    private LinkedBlockingQueue<EventStat> resendQueue;
-    private LinkedBlockingQueue<Event> eventQueue;
-
-    private long diskIORatePerSec;
-    private RateLimiter diskRateLimiter;
-
-    public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
-    public TubeMultiSessionFactory sessionFactory;
-    private String masterHostAndPortList;
-    private Integer logEveryNEvents;
-    private Integer sendTimeout;
-    private static int retryCnt = defaultRetryCnt;
-    private int requestTimeout = 60;
-    private int threadNum;
-    private Thread[] sinkThreadPool;
-
-    private String metaTopicFilePath = topicsFilePath;
-    private long linkMaxAllowedDelayedMsgCount;
-    private long sessionWarnDelayedMsgCount;
-    private long sessionMaxAllowedDelayedMsgCount;
-    private long nettyWriteBufferHighWaterMark;
-    private int recoverthreadcount;
-    //
-    private Map<String, String> dimensions;
-    private DataProxyMetricItemSet metricItemSet;
-    private static final MsgDedupHandler msgDedupHandler = new MsgDedupHandler();
-    private static ConcurrentHashMap<String, Long> illegalTopicMap =
-            new ConcurrentHashMap<String, Long>();
-
-    private boolean overflow = false;
-
-    /**
-     * diff publish
-     *
-     * @param originalSet
-     * @param endSet
-     */
-    public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
-
-        boolean changed = false;
-        for (String s : endSet) {
-            if (!originalSet.contains(s)) {
-                changed = true;
-                try {
-                    producer = getProducer(s);
-                } catch (Exception e) {
-                    logger.error("Get producer failed!", e);
-                }
-            }
-        }
-
-        if (changed) {
-            logger.info("topics.properties has changed, trigger diff publish for {}", getName());
-            topicProperties = configManager.getTopicProperties();
-        }
-    }
-
-    private MessageProducer getProducer(String topic) throws TubeClientException {
-        if (producerMap.containsKey(topic)) {
-            return producerMap.get(topic);
-        } else {
-            synchronized (this) {
-                if (!producerMap.containsKey(topic)) {
-                    if (producer == null || currentPublishTopicNum.get() >= MAX_TOPICS_EACH_PRODUCER_HOLD) {
-                        producer = sessionFactory.createProducer();
-                        currentPublishTopicNum.set(0);
-                    }
-                    // publish topic
-                    producer.publish(topic);
-                    producerMap.put(topic, producer);
-                    currentPublishTopicNum.incrementAndGet();
-                }
-            }
-            return producerMap.get(topic);
-        }
-    }
-
-    private TubeClientConfig initTubeConfig() throws Exception {
-        final TubeClientConfig tubeClientConfig = new TubeClientConfig(NetworkUtils.getLocalIp(),
-                this.masterHostAndPortList);
-        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
-        tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
-        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
-        tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
-        tubeClientConfig.setHeartbeatPeriodMs(15000L);
-        tubeClientConfig.setRpcTimeoutMs(20000L);
-
-        return tubeClientConfig;
-    }
-
-    /**
-     * If this function is called successively without calling {@see #destroyConnection()}, only the
-     * first call has any effect.
-     *
-     * @throws FlumeException if an RPC client connection could not be opened
-     */
-    private void createConnection() throws FlumeException {
-        // synchronized (tubeSessionLock) {
-        // if already connected, just skip
-        if (sessionFactory != null) {
-            return;
-        }
-
-        try {
-            TubeClientConfig conf = initTubeConfig();
-            // sessionFactory = new TubeMutilMessageSessionFactory(conf);
-            sessionFactory = new TubeMultiSessionFactory(conf);
-        } catch (TubeClientException e) {
-            logger.error("create connnection error in metasink, "
-                    + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
-            throw new FlumeException("connect to Tube error1, "
-                    + "maybe zkstr/zkroot set error, please re-check");
-        } catch (Throwable e) {
-            logger.error("create connnection error in metasink, "
-                    + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}",
-                    e.getMessage());
-            throw new FlumeException("connect to meta error2, "
-                    + "maybe tube master set error/shutdown in progress, please re-check");
-        }
-
-        if (producerMap == null) {
-            producerMap = new HashMap<String, MessageProducer>();
-        }
-        logger.debug("building tube producer");
-        // }
-    }
-
-    private void destroyConnection() {
-        for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
-            MessageProducer producer = entry.getValue();
-            try {
-                producer.shutdown();
-            } catch (TubeClientException e) {
-                logger.error("destroy producer error in metasink, MetaClientException {}", e.getMessage());
-            } catch (Throwable e) {
-                logger.error("destroy producer error in metasink, ex {}", e.getMessage());
-            }
-        }
-        producerMap.clear();
-
-        if (sessionFactory != null) {
-            try {
-                sessionFactory.shutdown();
-            } catch (TubeClientException e) {
-                logger.error("destroy sessionFactory error in metasink, MetaClientException {}",
-                        e.getMessage());
-            } catch (Exception e) {
-                logger.error("destroy sessionFactory error in metasink, ex {}", e.getMessage());
-            }
-        }
-        sessionFactory = null;
-        logger.debug("closed meta producer");
-    }
-
-    private void initTopicSet(Set<String> topicSet) throws Exception {
-        List<String> sortedList = new ArrayList(topicSet);
-        Collections.sort(sortedList);
-        int cycle = sortedList.size() / MAX_TOPICS_EACH_PRODUCER_HOLD;
-        int remainder = sortedList.size() % MAX_TOPICS_EACH_PRODUCER_HOLD;
-        long startTime = System.currentTimeMillis();
-        for (int i = 0; i <= cycle; i++) {
-            Set<String> subset = new HashSet<String>();
-            int startIndex = i * MAX_TOPICS_EACH_PRODUCER_HOLD;
-            int endIndex = startIndex + MAX_TOPICS_EACH_PRODUCER_HOLD - 1;
-            if (i == cycle) {
-                if (remainder == 0) {
-                    continue;
-                } else {
-                    endIndex = startIndex + remainder - 1;
-                }
-            }
-            for (int index = startIndex; index <= endIndex; index++) {
-                subset.add(sortedList.get(index));
-            }
-            producer = sessionFactory.createProducer();
-            try {
-                Set<String> succTopicSet = producer.publish(subset);
-                if (succTopicSet != null) {
-                    for (String succTopic : succTopicSet) {
-                        producerMap.put(succTopic, producer);
-                    }
-                    currentPublishTopicNum.set(succTopicSet.size());
-                    logger.info(getName() + " success Subset  : " + succTopicSet);
-                }
-            } catch (Exception e) {
-                logger.info(getName() + " meta sink initTopicSet fail.", e);
-            }
-        }
-        logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
-        logger.info(getName() + " producer is ready for topics : " + producerMap.keySet());
-    }
-
-    @Override
-    public void start() {
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-        // register metrics
-        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
-        MetricRegister.register(metricItemSet);
-
-        // create tube connection
-        try {
-            createConnection();
-        } catch (FlumeException e) {
-            logger.error("Unable to create tube client" + ". Exception follows.", e);
-
-            /* Try to prevent leaking resources. */
-            destroyConnection();
-
-            /* FIXME: Mark ourselves as failed. */
-            stop();
-            return;
-        }
-
-        super.start();
-        this.canSend = true;
-        this.canTake = true;
-
-        try {
-            initTopicSet(new HashSet<String>(topicProperties.values()));
-        } catch (Exception e) {
-            logger.info("meta sink start publish topic fail.", e);
-        }
-
-        for (int i = 0; i < sinkThreadPool.length; i++) {
-            sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
-            sinkThreadPool[i].start();
-        }
-
-    }
-
-    class SinkTask implements Runnable {
-
-        private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es)
-                throws TubeClientException, InterruptedException {
-            if (msgDedupHandler.judgeDupAndPutMsgSeqId(
-                    event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
-                logger.info("{} agent package {} existed,just discard.",
-                        getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
-            } else {
-                producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
-                flag.set(true);
-            }
-            illegalTopicMap.remove(topic);
-        }
-
-        private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
-            if (t instanceof TubeClientException) {
-                String message = t.getMessage();
-                if (message != null && (message.contains("No available queue for topic")
-                        || message.contains("The brokers of topic are all forbidden"))) {
-                    illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
-                    logger.info("IllegalTopicMap.put " + topic);
-                    return;
-                } else {
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
-                        // ignore..
-                    }
-                }
-            }
-            logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name="
-                    + Thread.currentThread().getName()
-                    + ",event.headers=" + es.getEvent().getHeaders(), t);
-        }
-
-        @Override
-        public void run() {
-            logger.info("Sink task {} started.", Thread.currentThread().getName());
-            while (canSend) {
-                boolean decrementFlag = false;
-                boolean resendBadEvent = false;
-                Event event = null;
-                EventStat es = null;
-                String topic = null;
-                try {
-                    if (SimpleMessageTubeSink.this.overflow) {
-                        SimpleMessageTubeSink.this.overflow = false;
-                        Thread.sleep(10);
-                    }
-                    if (!resendQueue.isEmpty()) {
-                        es = resendQueue.poll();
-                        if (es != null) {
-                            event = es.getEvent();
-                            // logger.warn("Resend event: {}", event.toString());
-                            if (event.getHeaders().containsKey(TOPIC)) {
-                                topic = event.getHeaders().get(TOPIC);
-                            }
-                            resendBadEvent = true;
-                        }
-                    } else {
-                        event = eventQueue.take();
-                        es = new EventStat(event);
-                        // sendCnt.incrementAndGet();
-                        if (event.getHeaders().containsKey(TOPIC)) {
-                            topic = event.getHeaders().get(TOPIC);
-                        }
-                    }
-
-                    if (event == null) {
-                        // ignore event is null, when multiple-thread SinkTask running
-                        // this null value comes from resendQueue
-                        continue;
-                    }
-
-                    if (topic == null || topic.equals("")) {
-                        logger.warn("no topic specified in event header, just skip this event");
-                        continue;
-                    }
-
-                    Long expireTime = illegalTopicMap.get(topic);
-                    if (expireTime != null) {
-                        long currentTime = System.currentTimeMillis();
-                        if (expireTime > currentTime) {
-
-                            // TODO: need to be improved.
-                            // reChannelEvent(es, topic);
-                            continue;
-                        } else {
-
-                            illegalTopicMap.remove(topic);
-                        }
-                    }
-                    MessageProducer producer = null;
-                    try {
-                        producer = getProducer(topic);
-                    } catch (Exception e) {
-                        logger.error("Get producer failed!", e);
-                    }
-
-                    if (producer == null) {
-                        illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
-                        continue;
-                    }
-
-                    AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
-                    sendMessage(event, topic, flagAtomic, es);
-                    decrementFlag = flagAtomic.get();
-
-                } catch (InterruptedException e) {
-                    logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
-                    return;
-                } catch (Throwable t) {
-                    handleException(t, topic, decrementFlag, es);
-                    resendEvent(es, decrementFlag);
-                }
-            }
-        }
-    }
-
-    public class MyCallback implements MessageSentCallback {
-
-        private EventStat myEventStat;
-        private long sendTime;
-
-        public MyCallback(EventStat eventStat) {
-            this.myEventStat = eventStat;
-            this.sendTime = System.currentTimeMillis();
-        }
-
-        @Override
-        public void onMessageSent(final MessageSentResult result) {
-            if (result.isSuccess()) {
-                // TODO: add stats
-                this.addMetric(myEventStat.getEvent(), true, sendTime);
-            } else {
-                this.addMetric(myEventStat.getEvent(), false, 0);
-                if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
-                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
-                            result.getErrMsg(), resendQueue.size(),
-                            myEventStat.getEvent().hashCode());
-
-                    return;
-                }
-                if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
-                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
-                            result.getErrMsg(), resendQueue.size(),
-                            myEventStat.getEvent().hashCode());
-                }
-                resendEvent(myEventStat, true);
-            }
-        }
-
-        /**
-         * addMetric
-         * 
-         * @param event
-         * @param result
-         * @param sendTime
-         */
-        private void addMetric(Event event, boolean result, long sendTime) {
-            Map<String, String> dimensions = new HashMap<>();
-            dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, SimpleMessageTubeSink.this.getName());
-            dimensions.put(DataProxyMetricItem.KEY_SINK_ID, SimpleMessageTubeSink.this.getName());
-            dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
-            DataProxyMetricItem.fillInlongId(event, dimensions);
-            DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-
-            DataProxyMetricItem metricItem = SimpleMessageTubeSink.this.metricItemSet.findMetricItem(dimensions);
-            if (result) {
-                metricItem.sendSuccessCount.incrementAndGet();
-                metricItem.sendSuccessSize.addAndGet(event.getBody().length);
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-                if (sendTime > 0) {
-                    long currentTime = System.currentTimeMillis();
-                    long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                            sendTime);
-                    long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
-                    long wholeDuration = currentTime - msgTime;
-                    metricItem.sinkDuration.addAndGet(sinkDuration);
-                    metricItem.nodeDuration.addAndGet(nodeDuration);
-                    metricItem.wholeDuration.addAndGet(wholeDuration);
-                }
-            } else {
-                metricItem.sendFailCount.incrementAndGet();
-                metricItem.sendFailSize.addAndGet(event.getBody().length);
-            }
-        }
-
-        @Override
-        public void onException(final Throwable e) {
-            Throwable t = e;
-            while (t.getCause() != null) {
-                t = t.getCause();
-            }
-            if (t instanceof OverflowException) {
-                SimpleMessageTubeSink.this.overflow = true;
-            }
-            resendEvent(myEventStat, true);
-        }
-    }
-
-    /**
-     * resend event
-     *
-     * @param es
-     * @param isDecrement
-     */
-    private void resendEvent(EventStat es, boolean isDecrement) {
-        try {
-            if (es == null || es.getEvent() == null) {
-                return;
-            }
-            msgDedupHandler.invalidMsgSeqId(
-                    es.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID));
-        } catch (Throwable throwable) {
-            logger.error(getName() + " Discard msg because put events to both of queue and "
-                    + "fileChannel fail,current resendQueue.size = "
-                    + resendQueue.size(), throwable);
-        }
-    }
-
-    @Override
-    public Status process() throws EventDeliveryException {
-        if (!this.canTake) {
-            return Status.BACKOFF;
-        }
-        Status status = Status.READY;
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event != null) {
-                if (diskRateLimiter != null) {
-                    diskRateLimiter.acquire(event.getBody().length);
-                }
-                if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
-                    logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
-                            + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
-                            + "it will cause memoryChannel full and fileChannel write.)", getName());
-                    tx.rollback();
-                } else {
-                    tx.commit();
-                    // metric
-                    if (event.getHeaders().containsKey(TOPIC)) {
-                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(TOPIC));
-                    } else {
-                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
-                    }
-                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readSuccessCount.incrementAndGet();
-                    metricItem.readSuccessSize.addAndGet(event.getBody().length);
-                }
-            } else {
-
-                // logger.info("[{}]No data to process in the channel.",getName());
-                status = Status.BACKOFF;
-                tx.commit();
-            }
-        } catch (Throwable t) {
-            logger.error("Process event failed!" + this.getName(), t);
-            try {
-                tx.rollback();
-            } catch (Throwable e) {
-                logger.error("metasink transaction rollback exception", e);
-
-            }
-        } finally {
-            tx.close();
-        }
-        return status;
-    }
-
-    @Override
-    public void configure(Context context) {
-        logger.info(context.toString());
-        // logger.info("sinktest:"+getName()+getChannel());//sinktest:meta-sink-msg2null
-
-        configManager = ConfigManager.getInstance();
-        topicProperties = configManager.getTopicProperties();
-        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
-            @Override
-            public void update() {
-
-                diffSetPublish(new HashSet<String>(topicProperties.values()),
-                        new HashSet<String>(configManager.getTopicProperties().values()));
-            }
-        });
-
-        masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
-        Preconditions.checkState(masterHostAndPortList != null, "No master and port list specified");
-
-        producerMap = new HashMap<String, MessageProducer>();
-
-        logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, defaultLogEveryNEvents);
-        logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + logEveryNEvents);
-        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
-
-        sendTimeout = context.getInteger(SEND_TIMEOUT, defaultSendTimeout);
-        logger.debug(this.getName() + " " + SEND_TIMEOUT + " " + sendTimeout);
-        Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
-
-        MAX_TOPICS_EACH_PRODUCER_HOLD = context.getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD_NAME, 200);
-        retryCnt = context.getInteger(RETRY_CNT, defaultRetryCnt);
-        logger.debug(this.getName() + " " + RETRY_CNT + " " + retryCnt);
-
-        boolean isSlaMetricSink = context.getBoolean(SLA_METRIC_SINK, false);
-        if (isSlaMetricSink) {
-            this.metaTopicFilePath = slaTopicFilePath;
-        }
-        // start message deduplication handler
-        msgDedupHandler.start(context.getBoolean(CLIENT_ID_CACHE, false),
-                context.getInteger(MAX_SURVIVED_TIME, -1),
-                context.getInteger(MAX_SURVIVED_SIZE, -1));
-
-        String requestTimeout = context.getString(TUBE_REQUEST_TIMEOUT);
-        if (requestTimeout != null) {
-            this.requestTimeout = Integer.parseInt(requestTimeout);
-        }
-
-        String sendRemoteStr = context.getString(SEND_REMOTE);
-        if (sendRemoteStr != null) {
-            sendRemote = Boolean.parseBoolean(sendRemoteStr);
-        }
-        if (sendRemote) {
-            proxyLogTopic = context.getString(LOG_TOPIC, proxyLogTopic);
-            proxyLogGroupId = context.getString(GROUPID, proxyLogStreamId);
-            proxyLogStreamId = context.getString(STREAMID, proxyLogStreamId);
-        }
-
-        resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
-
-        String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
-        threadNum = Integer.parseInt(sinkThreadNum);
-        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
-        sinkThreadPool = new Thread[threadNum];
-        eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
-
-        diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L);
-        if (diskIORatePerSec != 0) {
-            diskRateLimiter = RateLimiter.create(diskIORatePerSec);
-        }
-
-        linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
-                80000L);
-        sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
-                2000000L);
-        sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
-                4000000L);
-        nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
-                15 * 1024 * 1024L);
-        recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
-                Runtime.getRuntime().availableProcessors() + 1);
-    }
-
-    /**
-     * get metricItemSet
-     * @return the metricItemSet
-     */
-    public DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
deleted file mode 100644
index 88da9d079..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-
-import com.google.common.base.Preconditions;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.collections.SetUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.flume.source.shaded.guava.RateLimiter;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TubeSink extends AbstractSink implements Configurable {
-
-    private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
-    private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
-    private TubeProducerHolder producerHolder = null;
-    private volatile boolean canSend = false;
-    private volatile boolean isOverFlow = false;
-    private ConfigManager configManager;
-    private Map<String, String> topicProperties;
-    private MQClusterConfig tubeConfig;
-    private String usedMasterAddr = null;
-    private Set<String> masterHostAndPortLists;
-    // statistic info log
-    private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
-    private int statIntervalSec = 60;
-    private MonitorIndex monitorIndex;
-    private MonitorIndexExt monitorIndexExt;
-    private static final String KEY_SINK_DROPPED = "TUBE_SINK_DROPPED";
-    private static final String KEY_SINK_SUCCESS = "TUBE_SINK_SUCCESS";
-    private static final String KEY_SINK_FAILURE = "TUBE_SINK_FAILURE";
-    private static final String KEY_SINK_EXP = "TUBE_SINK_EXP";
-    // used for RoundRobin different cluster while send message
-    private RateLimiter diskRateLimiter;
-    private Thread[] sinkThreadPool;
-    private DataProxyMetricItemSet metricItemSet;
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private static final LogCounter LOG_SINK_TASK_PRINTER =
-            new LogCounter(10, 100000, 60 * 1000);
-    private LinkedBlockingQueue<Event> eventQueue;
-    private LinkedBlockingQueue<EventStat> resendQueue;
-    private final AtomicLong cachedMsgCnt = new AtomicLong(0);
-    private final AtomicLong takenMsgCnt = new AtomicLong(0);
-    private final AtomicLong resendMsgCnt = new AtomicLong(0);
-    private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0);
-    private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0);
-    private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0);
-    private final AtomicLong inflightMsgCnt = new AtomicLong(0);
-    private final AtomicLong successMsgCnt = new AtomicLong(0);
-    // statistic thread
-    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
-            .newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread"));
-
-    {
-        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L,
-                60L, TimeUnit.SECONDS);
-        logger.info("success to start performance statistic task!");
-    }
-
-    @Override
-    public void configure(Context context) {
-        logger.info(getName() + " configure from context: {}", context);
-        // initial parameters
-        configManager = ConfigManager.getInstance();
-        tubeConfig = configManager.getMqClusterConfig();
-        topicProperties = configManager.getTopicProperties();
-        masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
-        // only use first cluster address now
-        usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
-        // create producer holder
-        if (usedMasterAddr != null) {
-            producerHolder = new TubeProducerHolder(getName(),
-                    usedMasterAddr, configManager.getMqClusterConfig());
-        }
-        // start message deduplication handler
-        MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
-                tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
-        // get maxMonitorCnt's configure value
-        try {
-            maxMonitorCnt = context.getInteger(
-                    ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT);
-        } catch (NumberFormatException e) {
-            logger.warn("Property {} must specify an integer value: {}",
-                    ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT));
-        }
-        Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0");
-        statIntervalSec = tubeConfig.getStatIntervalSec();
-        Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
-        // initial TubeMQ configure
-        // initial resend queue size
-        int badEventQueueSize = tubeConfig.getBadEventQueueSize();
-        Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
-        resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
-        // initial sink thread pool
-        int threadNum = tubeConfig.getThreadNum();
-        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
-        sinkThreadPool = new Thread[threadNum];
-        // initial event queue size
-        int eventQueueSize = tubeConfig.getEventQueueSize();
-        Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
-        eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
-        // initial disk rate limiter
-        if (tubeConfig.getDiskIoRatePerSec() != 0) {
-            diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
-        }
-        // register configure change callback functions
-        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
-            @Override
-            public void update() {
-                diffSetPublish(new HashSet<>(topicProperties.values()),
-                        new HashSet<>(configManager.getTopicProperties().values()));
-            }
-        });
-        configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
-
-            @Override
-            public void update() {
-                diffUpdateTubeClient(masterHostAndPortLists,
-                        configManager.getMqClusterUrl2Token().keySet());
-            }
-        });
-    }
-
-    @Override
-    public void start() {
-        if (!this.started.compareAndSet(false, true)) {
-            logger.info("Duplicated call, " + getName() + " has started!");
-            return;
-        }
-        // initial monitors
-        if (statIntervalSec > 0) {
-            // switch for lots of metrics
-            monitorIndex = new MonitorIndex("Tube_Sink", statIntervalSec, maxMonitorCnt);
-            monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + this.getName(),
-                    statIntervalSec, maxMonitorCnt);
-        }
-        // register metrics
-        ConfigManager configManager = ConfigManager.getInstance();
-        String clusterId =
-                configManager.getCommonProperties().getOrDefault(
-                        ConfigConstants.PROXY_CLUSTER_NAME,
-                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
-        this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
-        MetricRegister.register(metricItemSet);
-        // create tube connection
-        if (producerHolder != null) {
-            try {
-                producerHolder.start(new HashSet<>(topicProperties.values()));
-                ConfigManager.getInstance().updMqClusterStatus(true);
-                logger.info("[{}] MQ Cluster service status ready!", getName());
-            } catch (FlumeException e) {
-                logger.error("Unable to start TubeMQ client. Exception follows.", e);
-                super.stop();
-                return;
-            }
-        }
-        // start the cleaner thread
-        super.start();
-        this.canSend = true;
-        for (int i = 0; i < sinkThreadPool.length; i++) {
-            sinkThreadPool[i] = new Thread(new TubeSinkTask(),
-                    getName() + "_tube_sink_sender-" + i);
-            sinkThreadPool[i].start();
-        }
-        logger.info(getName() + " started!");
-    }
-
-    @Override
-    public void stop() {
-        if (!this.started.compareAndSet(true, false)) {
-            logger.info("Duplicated call, " + getName() + " has stopped!");
-            return;
-        }
-        // waiting inflight message processed
-        int waitCount = 0;
-        while (takenMsgCnt.get() > 0 && waitCount++ < 10) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                logger.info("Stop thread has been interrupt!");
-                break;
-            }
-        }
-        // close sink thread pool
-        if (sinkThreadPool != null) {
-            for (Thread thread : sinkThreadPool) {
-                if (thread == null) {
-                    continue;
-                }
-                thread.interrupt();
-            }
-        }
-        // close producer holder
-        if (producerHolder != null) {
-            producerHolder.stop();
-        }
-        // stop statistic thread stop
-        SCHEDULED_EXECUTOR_SERVICE.shutdown();
-        // stop monitor index output
-        if (statIntervalSec > 0) {
-            try {
-                monitorIndex.shutDown();
-            } catch (Exception e) {
-                logger.warn("Stats runner interrupted");
-            }
-        }
-        super.stop();
-        logger.info(getName() + " stopped!");
-    }
-
-    @Override
-    public Status process() throws EventDeliveryException {
-        if (!this.started.get()) {
-            return Status.BACKOFF;
-        }
-        Status status = Status.READY;
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event != null) {
-                if (diskRateLimiter != null) {
-                    diskRateLimiter.acquire(event.getBody().length);
-                }
-                if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
-                    tx.commit();
-                    cachedMsgCnt.incrementAndGet();
-                } else {
-                    tx.rollback();
-                }
-            } else {
-                status = Status.BACKOFF;
-                tx.commit();
-            }
-        } catch (Throwable t) {
-            logger.error("Process event failed!" + this.getName(), t);
-            try {
-                tx.rollback();
-            } catch (Throwable e) {
-                logger.error("meta sink transaction rollback exception", e);
-            }
-        } finally {
-            tx.close();
-        }
-        return status;
-    }
-
-    private class TubeSinkTask implements Runnable {
-
-        public TubeSinkTask() {
-            // ignore
-        }
-
-        @Override
-        public void run() {
-            Event event = null;
-            EventStat es = null;
-            String topic = null;
-            boolean bChangedInflightValue = false;
-            logger.info("sink task {} started.", Thread.currentThread().getName());
-            while (canSend) {
-                try {
-                    if (!started.get() && cachedMsgCnt.get() <= 0) {
-                        logger.info("Found started is false and taken message count is zero, braek!");
-                        break;
-                    }
-                    if (isOverFlow) {
-                        isOverFlow = false;
-                        Thread.sleep(30);
-                    }
-                    // get event from queues
-                    if (resendQueue.isEmpty()) {
-                        event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
-                        if (event == null) {
-                            continue;
-                        }
-                        cachedMsgCnt.decrementAndGet();
-                        takenMsgCnt.incrementAndGet();
-                        es = new EventStat(event);
-                    } else {
-                        es = resendQueue.poll();
-                        if (es == null) {
-                            continue;
-                        }
-                        resendMsgCnt.decrementAndGet();
-                        event = es.getEvent();
-                    }
-                    topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
-                    // valid event status
-                    if (StringUtils.isBlank(topic)) {
-                        blankTopicDiscardMsgCnt.incrementAndGet();
-                        takenMsgCnt.decrementAndGet();
-                        if (statIntervalSec > 0) {
-                            monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
-                        }
-                        if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                            logger.error("No topic specified, just discard the event, event header is "
-                                    + event.getHeaders().toString());
-                        }
-                        continue;
-                    }
-                    // send message
-                    bChangedInflightValue = sendMessage(es, topic);
-                } catch (InterruptedException e) {
-                    logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
-                    return;
-                } catch (Throwable t) {
-                    resendEvent(es, bChangedInflightValue,
-                            DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, t.getMessage());
-                    if (t instanceof TubeClientException) {
-                        String message = t.getMessage();
-                        if (message != null && (message.contains("No available queue for topic")
-                                || message.contains("The brokers of topic are all forbidden"))) {
-                            isOverFlow = true;
-                        }
-                    }
-                    if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                        logger.error(
-                                "Sink task fail to send the message, finished = {}, sink.name = {},event.headers= {}",
-                                bChangedInflightValue, Thread.currentThread().getName(),
-                                es.getEvent().getHeaders(), t);
-                    }
-                }
-            }
-            logger.info("sink task {} stopped!", Thread.currentThread().getName());
-        }
-
-        private boolean sendMessage(EventStat es, String topic) throws Exception {
-            String errMsg = "";
-            Event event = es.getEvent();
-            MessageProducer producer = producerHolder.getProducer(topic);
-            if (producer == null) {
-                frozenTopicDiscardMsgCnt.incrementAndGet();
-                takenMsgCnt.decrementAndGet();
-                if (statIntervalSec > 0) {
-                    monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
-                }
-                errMsg = "Get producer failed for " + topic;
-                if (MessageUtils.isSinkRspType(event)) {
-                    MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
-                            DataProxyErrCode.PRODUCER_IS_NULL, errMsg);
-                }
-                if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                    logger.error(errMsg);
-                }
-                return false;
-            }
-            if (MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId(
-                    event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
-                dupDiscardMsgCnt.incrementAndGet();
-                takenMsgCnt.decrementAndGet();
-                if (statIntervalSec > 0) {
-                    monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
-                }
-                errMsg = "Duplicated message discard, by uuid = "
-                        + event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
-                if (MessageUtils.isSinkRspType(event)) {
-                    MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
-                            DataProxyErrCode.DUPLICATED_MESSAGE, errMsg);
-                }
-                logger.info("{} agent package {} existed,just discard.",
-                        Thread.currentThread().getName(),
-                        event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
-                return false;
-            } else {
-                producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
-                inflightMsgCnt.incrementAndGet();
-                return true;
-            }
-        }
-    }
-
-    private class MyCallback implements MessageSentCallback {
-
-        private EventStat myEventStat;
-        private long sendTime;
-
-        public MyCallback(EventStat eventStat) {
-            this.myEventStat = eventStat;
-            this.sendTime = System.currentTimeMillis();
-        }
-
-        @Override
-        public void onMessageSent(final MessageSentResult result) {
-            if (result.isSuccess()) {
-                successMsgCnt.incrementAndGet();
-                inflightMsgCnt.decrementAndGet();
-                takenMsgCnt.decrementAndGet();
-                this.addStatistics(myEventStat.getEvent(), true, false, sendTime);
-                if (MessageUtils.isSinkRspType(myEventStat.getEvent())) {
-                    MessageUtils.sinkReturnRspPackage((SinkRspEvent) myEventStat.getEvent(),
-                            DataProxyErrCode.SUCCESS, "");
-                }
-            } else {
-                this.addStatistics(myEventStat.getEvent(), false, false, 0);
-                if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
-                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
-                            result.getErrMsg(), resendQueue.size(),
-                            myEventStat.getEvent().hashCode());
-                    return;
-                } else if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW
-                        && LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
-                            result.getErrMsg(), resendQueue.size(),
-                            myEventStat.getEvent().hashCode());
-                }
-                resendEvent(myEventStat, true, DataProxyErrCode.MQ_RETURN_ERROR,
-                        result.getErrCode() + "#" + result.getErrMsg());
-            }
-        }
-
-        @Override
-        public void onException(final Throwable e) {
-            addStatistics(myEventStat.getEvent(), false, true, 0);
-            resendEvent(myEventStat, true, DataProxyErrCode.UNKNOWN_ERROR, e.getMessage());
-        }
-
-        /**
-         * Add statistics information
-         *
-         * @param event   the statistic event
-         * @param isSuccess  is processed successfully
-         * @param isException is exception when failure processed
-         * @param sendTime   the send time when success processed
-         */
-        private void addStatistics(Event event, boolean isSuccess,
-                boolean isException, long sendTime) {
-            if (event == null) {
-                return;
-            }
-            // add jmx metric items;
-            TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
-                    event, sendTime, isSuccess, event.getBody().length);
-            // add audit items;
-            if (isSuccess) {
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-            }
-            if (statIntervalSec <= 0) {
-                return;
-            }
-            // add monitor items base file storage
-            String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
-            String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-            String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-            int intMsgCnt = Integer.parseInt(
-                    event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-            long dataTimeL = Long.parseLong(
-                    event.getHeaders().get(AttributeConstants.DATA_TIME));
-            Pair<Boolean, String> evenProcType =
-                    MessageUtils.getEventProcType("", "");
-            // build statistic key
-            StringBuilder newBase = new StringBuilder(512)
-                    .append(getName()).append(SEP_HASHTAG).append(topic)
-                    .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
-                    .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
-                    .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
-                    .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
-            // count data
-            if (isSuccess) {
-                monitorIndex.addAndGet(newBase.toString(),
-                        intMsgCnt, 1, event.getBody().length, 0);
-                monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
-            } else {
-                monitorIndex.addAndGet(newBase.toString(),
-                        0, 0, 0, intMsgCnt);
-                monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
-                if (isException) {
-                    monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
-                }
-            }
-        }
-    }
-
-    private class TubeStatsTask implements Runnable {
-
-        @Override
-        public void run() {
-            if (!started.get()) {
-                return;
-            }
-            logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get()
-                    + ", takenMsgCnt=" + takenMsgCnt.get()
-                    + ", resendMsgCnt=" + resendMsgCnt.get()
-                    + ", blankTopicDiscardMsgCnt=" + blankTopicDiscardMsgCnt.get()
-                    + ", frozenTopicDiscardMsgCnt=" + frozenTopicDiscardMsgCnt.get()
-                    + ", dupDiscardMsgCnt=" + dupDiscardMsgCnt.get()
-                    + ", inflightMsgCnt=" + inflightMsgCnt.get()
-                    + ", successMsgCnt=" + successMsgCnt.get());
-        }
-    }
-
-    /**
-     * resend event
-     */
-    private void resendEvent(EventStat es, boolean sendFinished,
-            DataProxyErrCode errCode, String errMsg) {
-        try {
-            if (sendFinished) {
-                inflightMsgCnt.decrementAndGet();
-            }
-            if (es == null || es.getEvent() == null) {
-                takenMsgCnt.decrementAndGet();
-                return;
-            }
-            MSG_DEDUP_HANDLER.invalidMsgSeqId(es.getEvent()
-                    .getHeaders().get(ConfigConstants.SEQUENCE_ID));
-            if (MessageUtils.isSinkRspType(es.getEvent())) {
-                MessageUtils.sinkReturnRspPackage((SinkRspEvent) es.getEvent(), errCode, errMsg);
-            } else {
-                if (resendQueue.offer(es)) {
-                    resendMsgCnt.incrementAndGet();
-                } else {
-                    FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
-                    takenMsgCnt.decrementAndGet();
-                    if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                        logger.error(Thread.currentThread().getName()
-                                + " Channel --> Tube --> ResendQueue(full) -->"
-                                + "FailOverChannelProcessor(current code point),"
-                                + " Resend queue is full,Check if Tube server or network is ok.");
-                    }
-                }
-            }
-        } catch (Throwable throwable) {
-            takenMsgCnt.decrementAndGet();
-            if (statIntervalSec > 0) {
-                monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
-            }
-            if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
-                logger.error(getName() + " Discard msg because put events to both of queue and "
-                        + "fileChannel fail,current resendQueue.size = "
-                        + resendQueue.size(), throwable);
-            }
-        }
-    }
-
-    /**
-     * Differentiate unpublished topic sets and publish them
-     * attention: only append added topics
-     *
-     * @param curTopicSet   the current used topic set
-     * @param newTopicSet   the latest configured topic set
-     */
-    private void diffSetPublish(Set<String> curTopicSet, Set<String> newTopicSet) {
-        if (!this.started.get()) {
-            logger.info(getName() + " not started, ignore this change!");
-        }
-        if (SetUtils.isEqualSet(curTopicSet, newTopicSet)) {
-            return;
-        }
-        // filter unpublished topics
-        Set<String> addedTopics = new HashSet<>();
-        for (String topic : newTopicSet) {
-            if (StringUtils.isBlank(topic)) {
-                continue;
-            }
-            if (!curTopicSet.contains(topic)) {
-                addedTopics.add(topic);
-            }
-        }
-        // publish them
-        if (!addedTopics.isEmpty()) {
-            if (producerHolder != null) {
-                try {
-                    producerHolder.createProducersByTopicSet(addedTopics);
-                } catch (Throwable e) {
-                    logger.info(getName() + "'s publish new topic set fail.", e);
-                }
-            }
-            logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
-                    addedTopics);
-            topicProperties = configManager.getTopicProperties();
-        }
-    }
-
-    /**
-     * When masterUrlLists change, update tubeClient
-     * Requirement: when switching the Master cluster,
-     * the DataProxy node must not do the data reporting service
-     *
-     * @param curClusterSet previous masterHostAndPortList set
-     * @param newClusterSet new masterHostAndPortList set
-     */
-    private void diffUpdateTubeClient(Set<String> curClusterSet,
-            Set<String> newClusterSet) {
-        if (!this.started.get()) {
-            logger.info(getName() + " not started, ignore this change!");
-        }
-        if (newClusterSet == null || newClusterSet.isEmpty()
-                || SetUtils.isEqualSet(curClusterSet, newClusterSet)
-                || newClusterSet.contains(usedMasterAddr)) {
-            return;
-        }
-        String newMasterAddr = getFirstClusterAddr(newClusterSet);
-        if (newMasterAddr == null) {
-            return;
-        }
-        TubeProducerHolder newProducerHolder = new TubeProducerHolder(getName(),
-                newMasterAddr, configManager.getMqClusterConfig());
-        try {
-            newProducerHolder.start(new HashSet<>(configManager.getTopicProperties().values()));
-        } catch (Throwable e) {
-            logger.error(getName() + " create new producer holder for " + newMasterAddr
-                    + " failure, throw exception is  {}", e.getMessage());
-            return;
-        }
-        // replace current producer holder
-        final String tmpMasterAddr = usedMasterAddr;
-        TubeProducerHolder tmpProducerHolder = producerHolder;
-        producerHolder = newProducerHolder;
-        usedMasterAddr = newMasterAddr;
-        // close old producer holder
-        if (tmpProducerHolder == null) {
-            diffSetPublish(new HashSet<>(),
-                    new HashSet<>(configManager.getTopicProperties().values()));
-        } else {
-            tmpProducerHolder.stop();
-        }
-        if (!ConfigManager.getInstance().isMqClusterReady()) {
-            ConfigManager.getInstance().updMqClusterStatus(true);
-            logger.info("[{}] MQ Cluster service status ready!", getName());
-        }
-        logger.info(getName() + " switch cluster from "
-                + tmpMasterAddr + " to " + usedMasterAddr);
-    }
-
-    /**
-     * Get first cluster address
-     *
-     * @param clusterSet  cluster set configure
-     * @return  the selected cluster address
-     *          null if set is empty or if items are all blank
-     */
-    private String getFirstClusterAddr(Set<String> clusterSet) {
-        String tmpMasterAddr = null;
-        for (String masterAddr : clusterSet) {
-            if (StringUtils.isBlank(masterAddr)) {
-                continue;
-            }
-            tmpMasterAddr = masterAddr;
-            break;
-        }
-        return tmpMasterAddr;
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java
deleted file mode 100644
index 3692c0ab3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstactZoneWorker extends Thread {
-
-    public static final Logger LOG = LoggerFactory.getLogger(AbstactZoneWorker.class);
-
-    protected final String workerName;
-    protected final AbstractZoneSinkContext context;
-
-    protected AbstractZoneProducer zoneProducer;
-    protected LifecycleState status;
-
-    protected int workerIndex;
-
-    /**
-     * Constructor
-     *
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
-    public AbstactZoneWorker(String sinkName, int workerIndex, AbstractZoneSinkContext context,
-            AbstractZoneProducer zoneProducer) {
-        super();
-        this.workerName = sinkName + "-worker-" + workerIndex;
-        this.workerIndex = workerIndex;
-        this.context = context;
-        this.zoneProducer = zoneProducer;
-        this.status = LifecycleState.IDLE;
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        this.zoneProducer.start();
-        this.status = LifecycleState.START;
-        super.start();
-    }
-
-    /**
-     *
-     * close
-     */
-    public void close() {
-        // close all producers
-        this.zoneProducer.close();
-        this.status = LifecycleState.STOP;
-    }
-
-    /**
-     * run
-     */
-    @Override
-    public void run() {
-        while (status != LifecycleState.STOP) {
-            try {
-                DispatchProfile event = context.getDispatchQueues().get(workerIndex).poll();
-                if (event == null) {
-                    this.sleepOneInterval();
-                    continue;
-                }
-                // metric
-                context.addSendMetric(event, workerName);
-                // send
-                this.zoneProducer.send(event);
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-                this.sleepOneInterval();
-            }
-        }
-    }
-
-    /**
-     * sleepOneInterval
-     */
-    private void sleepOneInterval() {
-        try {
-            Thread.sleep(context.getProcessInterval());
-        } catch (InterruptedException e1) {
-            LOG.error(e1.getMessage(), e1);
-        }
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java
deleted file mode 100644
index 9fbd07894..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flume.Context;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.sdk.commons.protocol.EventConstants;
-
-public abstract class AbstractZoneClusterProducer implements LifecycleAware {
-
-    protected final String workerName;
-    protected final CacheClusterConfig config;
-    protected final AbstractZoneSinkContext sinkContext;
-    protected final Context producerContext;
-    protected final String cacheClusterName;
-    protected LifecycleState state;
-
-    /**
-     * Constructor
-     *
-     * @param workerName
-     * @param config
-     * @param context
-     */
-    public AbstractZoneClusterProducer(String workerName, CacheClusterConfig config, AbstractZoneSinkContext context) {
-        this.workerName = workerName;
-        this.config = config;
-        this.sinkContext = context;
-        this.producerContext = context.getProducerContext();
-        this.state = LifecycleState.IDLE;
-        this.cacheClusterName = config.getClusterName();
-    }
-
-    /**
-     * getLifecycleState
-     *
-     * @return
-     */
-    @Override
-    public LifecycleState getLifecycleState() {
-        return state;
-    }
-
-    /**
-     * send DispatchProfile
-     *
-     * @param event DispatchProfile
-     * @return boolean sendResult
-     */
-    public abstract boolean send(DispatchProfile event);
-
-    /**
-     * encodeCacheMessageHeaders
-     *
-     * @param  event
-     * @return       Map
-     */
-    public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
-        Map<String, String> headers = new HashMap<>();
-        // version int32 protocol version, the value is 1
-        headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
-        // inlongGroupId string inlongGroupId
-        headers.put(EventConstants.INLONG_GROUP_ID, event.getInlongGroupId());
-        // inlongStreamId string inlongStreamId
-        headers.put(EventConstants.INLONG_STREAM_ID, event.getInlongStreamId());
-        // proxyName string proxy node id, IP or conainer name
-        headers.put(EventConstants.HEADER_KEY_PROXY_NAME, sinkContext.getNodeId());
-        // packTime int64 pack time, milliseconds
-        headers.put(EventConstants.HEADER_KEY_PACK_TIME, String.valueOf(System.currentTimeMillis()));
-        // msgCount int32 message count
-        headers.put(EventConstants.HEADER_KEY_MSG_COUNT, String.valueOf(event.getEvents().size()));
-        // srcLength int32 total length of raw messages body
-        headers.put(EventConstants.HEADER_KEY_SRC_LENGTH, String.valueOf(event.getSize()));
-        // compressType int
-        // compress type of body data
-        // INLONG_NO_COMPRESS = 0,
-        // INLONG_GZ = 1,
-        // INLONG_SNAPPY = 2
-        headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
-                String.valueOf(sinkContext.getCompressType().getNumber()));
-        // messageKey string partition hash key, optional
-        return headers;
-    }
-
-    /**
-     * get cacheClusterName
-     *
-     * @return the cacheClusterName
-     */
-    public String getCacheClusterName() {
-        return cacheClusterName;
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java
deleted file mode 100644
index 7be2f38a3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractZoneProducer {
-
-    public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneProducer.class);
-    public static final int MAX_INDEX = Integer.MAX_VALUE / 2;
-
-    protected final String workerName;
-    protected final AbstractZoneSinkContext context;
-    protected Timer reloadTimer;
-
-    protected List<AbstractZoneClusterProducer> clusterList = new ArrayList<>();
-    protected List<AbstractZoneClusterProducer> deletingClusterList = new ArrayList<>();
-
-    protected AtomicInteger clusterIndex = new AtomicInteger(0);
-
-    public AbstractZoneProducer(String workerName,
-            AbstractZoneSinkContext context) {
-        this.workerName = workerName;
-        this.context = context;
-    }
-
-    /**
-     * start
-     */
-    public void start() {
-        try {
-            this.reload();
-            this.setReloadTimer();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        try {
-            this.reloadTimer.cancel();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        for (AbstractZoneClusterProducer cluster : this.clusterList) {
-            cluster.stop();
-        }
-    }
-
-    /**
-     * setReloadTimer
-     */
-    private void setReloadTimer() {
-        reloadTimer = new Timer(true);
-        TimerTask task = new TimerTask() {
-
-            public void run() {
-                reload();
-            }
-        };
-        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
-                context.getReloadInterval());
-    }
-
-    /**
-     * reload
-     */
-    public abstract void reload();
-
-    protected void reload(ZoneClusterProducerCalculator zoneClusterProducerCalculator) {
-        try {
-            // stop deleted cluster
-            deletingClusterList.forEach(item -> {
-                item.stop();
-            });
-            deletingClusterList.clear();
-            // update cluster list
-            List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
-            List<AbstractZoneClusterProducer> newClusterList = new ArrayList<>(configList.size());
-            // prepare
-            Set<String> newClusterNames = new HashSet<>();
-            configList.forEach(item -> {
-                newClusterNames.add(item.getClusterName());
-            });
-            Set<String> oldClusterNames = new HashSet<>();
-            clusterList.forEach(item -> {
-                oldClusterNames.add(item.getCacheClusterName());
-            });
-            // add
-            for (CacheClusterConfig config : configList) {
-                if (!oldClusterNames.contains(config.getClusterName())) {
-                    AbstractZoneClusterProducer cluster = zoneClusterProducerCalculator.calculator(workerName,
-                            config, context);
-                    cluster.start();
-                    newClusterList.add(cluster);
-                }
-            }
-            // remove
-            for (AbstractZoneClusterProducer cluster : this.clusterList) {
-                if (newClusterNames.contains(cluster.getCacheClusterName())) {
-                    newClusterList.add(cluster);
-                } else {
-                    deletingClusterList.add(cluster);
-                }
-            }
-            this.clusterList = newClusterList;
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-
-    }
-
-    /**
-     * send
-     *
-     * @param event
-     */
-    public boolean send(DispatchProfile event) {
-        int currentIndex = clusterIndex.getAndIncrement();
-        if (currentIndex > MAX_INDEX) {
-            clusterIndex.set(0);
-        }
-        List<AbstractZoneClusterProducer> currentClusterList = this.clusterList;
-        int currentSize = currentClusterList.size();
-        int realIndex = currentIndex % currentSize;
-        AbstractZoneClusterProducer clusterProducer = currentClusterList.get(realIndex);
-        return clusterProducer.send(event);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java
deleted file mode 100644
index 9776c83f3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.dataproxy.dispatch.DispatchManager;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractZoneSink extends AbstractSink implements Configurable {
-
-    public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneSink.class);
-
-    protected Context parentContext;
-    protected AbstractZoneSinkContext context;
-    protected List<AbstactZoneWorker> workers = new ArrayList<>();
-    // message group
-    protected DispatchManager dispatchManager;
-    protected ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues = new ArrayList<>();
-    // scheduled thread pool
-    // reload
-    // dispatch
-    protected ScheduledExecutorService scheduledPool;
-
-    /**
-     * configure
-     *
-     * @param context
-     */
-    @Override
-    public void configure(Context context) {
-        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
-        this.parentContext = context;
-    }
-
-    public void start(ZoneWorkerCalculator zoneWorkerCalculator) {
-        try {
-            if (getChannel() == null) {
-                LOG.error("channel is null");
-            }
-            this.context.start();
-            for (int i = 0; i < context.getMaxThreads(); i++) {
-                LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
-                dispatchQueues.add(dispatchQueue);
-            }
-            this.dispatchManager = new DispatchManager(parentContext, dispatchQueues);
-            this.scheduledPool = Executors.newScheduledThreadPool(2);
-            // dispatch
-            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
-
-                public void run() {
-                    dispatchManager.setNeedOutputOvertimeData();
-                }
-            }, this.dispatchManager.getDispatchTimeout(),
-                    this.dispatchManager.getDispatchTimeout(),
-                    TimeUnit.MILLISECONDS);
-            // create worker
-            for (int i = 0; i < context.getMaxThreads(); i++) {
-                AbstactZoneWorker worker = zoneWorkerCalculator.calculator(this.getName(), i, context);
-                worker.start();
-                this.workers.add(worker);
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        super.start();
-    }
-
-    @Deprecated
-    public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet, Set<String> endSet) {
-        return;
-    }
-
-    @Deprecated
-    public void diffUpdatePulsarClient(PulsarClientService pulsarClientService, Map<String, String> originalCluster,
-            Map<String, String> endCluster) {
-        this.workers.forEach(worker -> {
-            worker.zoneProducer.reload();
-        });
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        for (AbstactZoneWorker worker : workers) {
-            try {
-                worker.close();
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        this.context.close();
-        super.stop();
-    }
-
-    /**
-     * process
-     *
-     * @return                        Status
-     * @throws EventDeliveryException
-     */
-    @Override
-    public Sink.Status process() throws EventDeliveryException {
-        this.dispatchManager.outputOvertimeData();
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event == null) {
-                tx.commit();
-                return Sink.Status.BACKOFF;
-            }
-            // ProxyEvent
-            if (event instanceof ProxyEvent) {
-                ProxyEvent proxyEvent = (ProxyEvent) event;
-                this.dispatchManager.addEvent(proxyEvent);
-                tx.commit();
-                return Sink.Status.READY;
-            }
-            // ProxyPackEvent
-            if (event instanceof ProxyPackEvent) {
-                ProxyPackEvent packEvent = (ProxyPackEvent) event;
-                this.dispatchManager.addPackEvent(packEvent);
-                tx.commit();
-                return Sink.Status.READY;
-            }
-            tx.commit();
-            this.context.addSendFailMetric();
-            return Sink.Status.READY;
-        } catch (Throwable t) {
-            LOG.error("Process event failed!" + this.getName(), t);
-            try {
-                tx.rollback();
-            } catch (Throwable e) {
-                LOG.error("Channel take transaction rollback exception:" + getName(), e);
-            }
-            return Sink.Status.BACKOFF;
-        } finally {
-            tx.close();
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java
deleted file mode 100644
index d04d8b225..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * SinkContext
- */
-public abstract class AbstractZoneSinkContext {
-
-    public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneSinkContext.class);
-
-    public static final String KEY_MAX_THREADS = "maxThreads";
-    public static final String KEY_PROCESS_INTERVAL = "processInterval";
-    public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
-
-    protected final String sinkName;
-    protected final Context sinkContext;
-
-    protected final Channel channel;
-
-    protected final int maxThreads;
-    protected final long processInterval;
-    protected final long reloadInterval;
-
-    protected final DataProxyMetricItemSet metricItemSet;
-    protected Timer reloadTimer;
-
-    public static final String KEY_NODE_ID = "nodeId";
-    public static final String PREFIX_PRODUCER = "producer.";
-    public static final String KEY_COMPRESS_TYPE = "compressType";
-
-    protected ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues = new ArrayList<>();
-
-    protected final String proxyClusterId;
-    protected final String nodeId;
-    protected final Context producerContext;
-    protected final IdTopicConfigHolder idTopicHolder;
-    protected final CacheClusterConfigHolder cacheHolder;
-    protected final ProxySdk.INLONG_COMPRESSED_TYPE compressType;
-
-    /**
-     * Constructor
-     */
-    public AbstractZoneSinkContext(String sinkName, Context context, Channel channel,
-            ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
-        this.sinkName = sinkName;
-        this.sinkContext = context;
-        this.channel = channel;
-        this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
-        this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
-        this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
-        //
-        this.metricItemSet = new DataProxyMetricItemSet(sinkName);
-        MetricRegister.register(this.metricItemSet);
-
-        this.dispatchQueues = dispatchQueues;
-        // proxyClusterId
-        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
-        // nodeId
-        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
-        // compressionType
-        String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
-                ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
-        this.compressType = ProxySdk.INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
-        // producerContext
-        Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
-        this.producerContext = new Context(producerParams);
-        // idTopicHolder
-        Context commonPropertiesContext = new Context(CommonPropertiesHolder.get());
-        this.idTopicHolder = new IdTopicConfigHolder();
-        this.idTopicHolder.configure(commonPropertiesContext);
-        // cacheHolder
-        this.cacheHolder = new CacheClusterConfigHolder();
-        this.cacheHolder.configure(commonPropertiesContext);
-    }
-
-    /**
-     * start
-     */
-    public void start() {
-        try {
-            this.reload();
-            this.setReloadTimer();
-            this.idTopicHolder.start();
-            this.cacheHolder.start();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        try {
-            this.reloadTimer.cancel();
-            this.idTopicHolder.close();
-            this.cacheHolder.close();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * setReloadTimer
-     */
-    protected void setReloadTimer() {
-        reloadTimer = new Timer(true);
-        TimerTask task = new TimerTask() {
-
-            public void run() {
-                reload();
-            }
-        };
-        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-    }
-
-    /**
-     * get sinkName
-     *
-     * @return the sinkName
-     */
-    public String getSinkName() {
-        return sinkName;
-    }
-
-    /**
-     * get sinkContext
-     *
-     * @return the sinkContext
-     */
-    public Context getSinkContext() {
-        return sinkContext;
-    }
-
-    /**
-     * get channel
-     *
-     * @return the channel
-     */
-    public Channel getChannel() {
-        return channel;
-    }
-
-    /**
-     * get maxThreads
-     *
-     * @return the maxThreads
-     */
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    /**
-     * get processInterval
-     *
-     * @return the processInterval
-     */
-    public long getProcessInterval() {
-        return processInterval;
-    }
-
-    /**
-     * get reloadInterval
-     *
-     * @return the reloadInterval
-     */
-    public long getReloadInterval() {
-        return reloadInterval;
-    }
-
-    /**
-     * get metricItemSet
-     *
-     * @return the metricItemSet
-     */
-    public DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
-    /**
-     * get proxyClusterId
-     *
-     * @return the proxyClusterId
-     */
-    public String getProxyClusterId() {
-        return proxyClusterId;
-    }
-
-    /**
-     * get producerContext
-     *
-     * @return the producerContext
-     */
-    public Context getProducerContext() {
-        return producerContext;
-    }
-
-    /**
-     * get idTopicHolder
-     *
-     * @return the idTopicHolder
-     */
-    public IdTopicConfigHolder getIdTopicHolder() {
-        return idTopicHolder;
-    }
-
-    /**
-     * get cacheHolder
-     *
-     * @return the cacheHolder
-     */
-    public CacheClusterConfigHolder getCacheHolder() {
-        return cacheHolder;
-    }
-
-    /**
-     * get compressType
-     *
-     * @return the compressType
-     */
-    public ProxySdk.INLONG_COMPRESSED_TYPE getCompressType() {
-        return compressType;
-    }
-
-    /**
-     * get nodeId
-     *
-     * @return the nodeId
-     */
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    /**
-     * fillInlongId
-     *
-     * @param currentRecord
-     * @param dimensions
-     */
-    public static void fillInlongId(DispatchProfile currentRecord, Map<String, String> dimensions) {
-        String inlongGroupId = currentRecord.getInlongGroupId();
-        inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId;
-        String inlongStreamId = currentRecord.getInlongStreamId();
-        inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : inlongStreamId;
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
-    }
-
-    /**
-     * addSendResultMetric
-     *
-     * @param currentRecord
-     * @param bid
-     * @param result
-     * @param sendTime
-     */
-    public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
-        // metric
-        fillInlongId(currentRecord, dimensions);
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
-        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
-        long dispatchTime = currentRecord.getDispatchTime();
-        long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
-        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
-        long count = currentRecord.getCount();
-        long size = currentRecord.getSize();
-        if (result) {
-            metricItem.sendSuccessCount.addAndGet(count);
-            metricItem.sendSuccessSize.addAndGet(size);
-            currentRecord.getEvents().forEach((event) -> {
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-            });
-            if (sendTime > 0) {
-                long currentTime = System.currentTimeMillis();
-                currentRecord.getEvents().forEach((event) -> {
-                    long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - event.getSourceTime();
-                    long wholeDuration = currentTime - event.getMsgTime();
-                    metricItem.sinkDuration.addAndGet(sinkDuration);
-                    metricItem.nodeDuration.addAndGet(nodeDuration);
-                    metricItem.wholeDuration.addAndGet(wholeDuration);
-                });
-            }
-        } else {
-            metricItem.sendFailCount.addAndGet(count);
-            metricItem.sendFailSize.addAndGet(size);
-        }
-    }
-
-    /**
-     * get dispatchQueue
-     *
-     * @return the dispatchQueue
-     */
-    public ArrayList<LinkedBlockingQueue<DispatchProfile>> getDispatchQueues() {
-        return dispatchQueues;
-    }
-
-    public void setDispatchQueues(
-            ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
-        this.dispatchQueues = dispatchQueues;
-    }
-
-    /**
-     * processSendFail
-     * @param currentRecord
-     * @param producerTopic
-     * @param sendTime
-     */
-    public void processSendFail(DispatchProfile currentRecord, String producerTopic, long sendTime) {
-        if (currentRecord.isResend()) {
-            dispatchQueues.get(currentRecord.getSendIndex() % maxThreads).offer(currentRecord);
-            this.addSendResultMetric(currentRecord, producerTopic, false, sendTime);
-        } else {
-            currentRecord.fail();
-        }
-    }
-
-    /**
-     * addSendFailMetric
-     */
-    public void addSendFailMetric() {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
-        long msgTime = System.currentTimeMillis();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
-        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
-        metricItem.sendFailCount.incrementAndGet();
-    }
-
-    /**
-     * addSendMetric
-     *
-     * @param currentRecord
-     * @param bid
-     */
-    public void addSendMetric(DispatchProfile currentRecord, String bid) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
-        // metric
-        fillInlongId(currentRecord, dimensions);
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
-        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
-        long msgTime = currentRecord.getDispatchTime();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
-        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
-        long count = currentRecord.getCount();
-        long size = currentRecord.getSize();
-        metricItem.sendCount.addAndGet(count);
-        metricItem.sendSize.addAndGet(size);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java
deleted file mode 100644
index 673af1feb..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-
-@FunctionalInterface
-public interface ZoneClusterProducerCalculator {
-
-    AbstractZoneClusterProducer calculator(String workerName,
-            CacheClusterConfig config, AbstractZoneSinkContext context);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java
deleted file mode 100644
index 6bb300f9d..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-public interface ZoneWorkerCalculator {
-
-    AbstactZoneWorker calculator(String sinkName, int workerIndex, AbstractZoneSinkContext context);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java
deleted file mode 100644
index 9e85028a4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * KafkaClusterProducer
- */
-public class KafkaClusterProducer extends AbstractZoneClusterProducer {
-
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaClusterProducer.class);
-
-    // kafka producer
-    private KafkaProducer<String, byte[]> producer;
-
-    /**
-     * Constructor
-     *
-     * @param workerName
-     * @param config
-     * @param context
-     */
-    public KafkaClusterProducer(String workerName, CacheClusterConfig config, KafkaZoneSinkContext context) {
-        super(workerName, config, context);
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        super.state = LifecycleState.START;
-        // create kafka producer
-        try {
-            // prepare configuration
-            Properties props = new Properties();
-            props.putAll(super.producerContext.getParameters());
-            props.putAll(config.getParams());
-            LOG.info("try to create kafka client:{}", props);
-            producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
-            LOG.info("create new producer success:{}", producer);
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        super.state = LifecycleState.STOP;
-        // kafka producer
-        this.producer.close();
-    }
-
-    /**
-     * send
-     * 
-     * @param event
-     */
-    @Override
-    public boolean send(DispatchProfile event) {
-        try {
-            // topic
-            String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
-            if (topic == null) {
-                sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
-                return false;
-            }
-            // create producer failed
-            if (producer == null) {
-                sinkContext.processSendFail(event, topic, 0);
-                return false;
-            }
-            // headers
-            Map<String, String> headers = this.encodeCacheMessageHeaders(event);
-            // compress
-            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
-            // sendAsync
-            long sendTime = System.currentTimeMillis();
-
-            // prepare ProducerRecord
-            ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, bodyBytes);
-            // add headers
-            headers.forEach((key, value) -> {
-                producerRecord.headers().add(key, value.getBytes());
-            });
-
-            // callback
-            Callback callback = new Callback() {
-
-                @Override
-                public void onCompletion(RecordMetadata arg0, Exception ex) {
-                    if (ex != null) {
-                        LOG.error("Send fail:{}", ex.getMessage());
-                        LOG.error(ex.getMessage(), ex);
-                        if (event.isResend()) {
-                            sinkContext.processSendFail(event, topic, sendTime);
-                        } else {
-                            event.fail();
-                        }
-                    } else {
-                        sinkContext.addSendResultMetric(event, topic, true, sendTime);
-                        event.ack();
-                    }
-                }
-            };
-            producer.send(producerRecord, callback);
-            return true;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            sinkContext.processSendFail(event, event.getUid(), 0);
-            return false;
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java
deleted file mode 100644
index 3376c0d9b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class KafkaZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
-    /**
-     * Constructor
-     * 
-     * @param workerName
-     * @param context
-     */
-    public KafkaZoneProducer(String workerName, KafkaZoneSinkContext context) {
-        super(workerName, context);
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-        super.reload(this);
-    }
-
-    @Override
-    public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
-            AbstractZoneSinkContext context) {
-        return new KafkaClusterProducer(workerName, config, (KafkaZoneSinkContext) context);
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java
deleted file mode 100644
index c3a71b76f..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * KafkaZoneSink
- */
-public class KafkaZoneSink extends AbstractZoneSink {
-
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaZoneSink.class);
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        try {
-            super.context = new KafkaZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
-            super.start((sinkName, workIndex, context) -> new KafkaZoneWorker(sinkName, workIndex,
-                    (KafkaZoneSinkContext) context));
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        super.start();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java
deleted file mode 100644
index d6545cf78..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * 
- * KafkaZoneSinkContext
- */
-public class KafkaZoneSinkContext extends AbstractZoneSinkContext {
-
-    /**
-     * Constructor
-     * 
-     * @param context
-     */
-    public KafkaZoneSinkContext(String sinkName, Context context, Channel channel,
-            ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
-        super(sinkName, context, channel, dispatchQueues);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java
deleted file mode 100644
index 7f143886c..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * KafkaZoneWorker
- */
-public class KafkaZoneWorker extends AbstactZoneWorker {
-
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaZoneWorker.class);
-
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
-    public KafkaZoneWorker(String sinkName, int workerIndex, KafkaZoneSinkContext context) {
-        super(sinkName, workerIndex, context,
-                new KafkaZoneProducer(sinkName + "-worker-" + workerIndex, context));
-
-    }
-
-    /**
-     * run
-     */
-    @Override
-    public void run() {
-        LOG.info(String.format("start KafkaZoneWorker:%s", this.workerName));
-        super.run();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java
deleted file mode 100644
index 2de80ec26..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.security.SecureRandom;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_AUTHENTICATION;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXBYTES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXMESSAGES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXPUBLISHDELAY;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BLOCKIFQUEUEFULL;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_COMPRESSIONTYPE;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_CONNECTIONSPERBROKER;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_ENABLEBATCHING;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_IOTHREADS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MAXPENDINGMESSAGES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MAXPENDINGMESSAGESACROSSPARTITIONS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MEMORYLIMIT;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_NAMESPACE;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_SENDTIMEOUT;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_SERVICE_URL;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTERVAL_SECONDS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_TENANT;
-
-/**
- * PulsarClusterProducer
- */
-public class PulsarClusterProducer extends AbstractZoneClusterProducer {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
-
-    private String tenant;
-    private String namespace;
-
-    /**
-     * pulsar client
-     */
-    private PulsarClient client;
-    private ProducerBuilder<byte[]> baseBuilder;
-    private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
-
-    /**
-     * Constructor
-     * 
-     * @param workerName Worker name
-     * @param config Cache cluster configuration
-     * @param context Sink context
-     */
-    public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
-        super(workerName, config, context);
-        this.tenant = config.getParams().get(KEY_TENANT);
-        this.namespace = config.getParams().get(KEY_NAMESPACE);
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        this.state = LifecycleState.START;
-        // create pulsar client
-        try {
-            String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
-            String authentication = config.getParams().get(KEY_AUTHENTICATION);
-            this.client = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    .authentication(AuthenticationFactory.token(authentication))
-                    .ioThreads(producerContext.getInteger(KEY_IOTHREADS, 1))
-                    .memoryLimit(producerContext.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
-                    .connectionsPerBroker(producerContext.getInteger(KEY_CONNECTIONSPERBROKER, 10))
-                    .statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS), -1),
-                            TimeUnit.SECONDS)
-                    .build();
-            this.baseBuilder = client.newProducer();
-            // Map<String, Object> builderConf = new HashMap<>();
-            // builderConf.putAll(context.getParameters());
-            this.baseBuilder
-                    .sendTimeout(producerContext.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
-                    .maxPendingMessages(producerContext.getInteger(KEY_MAXPENDINGMESSAGES, 500))
-                    .maxPendingMessagesAcrossPartitions(
-                            producerContext.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000));
-            this.baseBuilder
-                    .batchingMaxMessages(producerContext.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
-                    .batchingMaxPublishDelay(producerContext.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
-                            TimeUnit.MILLISECONDS)
-                    .batchingMaxBytes(producerContext.getInteger(KEY_BATCHINGMAXBYTES, 131072));
-            this.baseBuilder
-                    .accessMode(ProducerAccessMode.Shared)
-                    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
-                    .blockIfQueueFull(producerContext.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
-            this.baseBuilder
-                    .roundRobinRouterBatchingPartitionSwitchFrequency(
-                            producerContext.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
-                    .enableBatching(producerContext.getBoolean(KEY_ENABLEBATCHING, true))
-                    .compressionType(this.getPulsarCompressionType());
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * getPulsarCompressionType
-     * 
-     * @return CompressionType LZ4/NONE/ZLIB/ZSTD/SNAPPY
-     */
-    private CompressionType getPulsarCompressionType() {
-        String type = this.producerContext.getString(KEY_COMPRESSIONTYPE, CompressionType.SNAPPY.name());
-        switch (type) {
-            case "LZ4":
-                return CompressionType.LZ4;
-            case "NONE":
-                return CompressionType.NONE;
-            case "ZLIB":
-                return CompressionType.ZLIB;
-            case "ZSTD":
-                return CompressionType.ZSTD;
-            case "SNAPPY":
-                return CompressionType.SNAPPY;
-            default:
-                return CompressionType.NONE;
-        }
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        super.state = LifecycleState.STOP;
-        //
-        for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
-            try {
-                entry.getValue().close();
-            } catch (PulsarClientException e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        try {
-            this.client.close();
-        } catch (PulsarClientException e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * send DispatchProfile
-     * 
-     * @param event DispatchProfile
-     * @return boolean sendResult
-     */
-    @Override
-    public boolean send(DispatchProfile event) {
-        try {
-            // topic
-            String producerTopic = this.getProducerTopic(event);
-            if (producerTopic == null) {
-                sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
-                event.fail();
-                return false;
-            }
-            // get producer
-            Producer<byte[]> producer = this.producerMap.get(producerTopic);
-            if (producer == null) {
-                try {
-                    LOG.info("try to new a object for topic " + producerTopic);
-                    SecureRandom secureRandom = new SecureRandom(
-                            (workerName + "-" + cacheClusterName + "-" + producerTopic + System.currentTimeMillis())
-                                    .getBytes());
-                    String producerName = workerName + "-" + cacheClusterName + "-" + producerTopic + "-"
-                            + secureRandom.nextLong();
-                    producer = baseBuilder.clone().topic(producerTopic)
-                            .producerName(producerName)
-                            .create();
-                    LOG.info("create new producer success:{}", producer.getProducerName());
-                    Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
-                    if (oldProducer != null) {
-                        producer.close();
-                        LOG.info("close producer success:{}", producer.getProducerName());
-                        producer = oldProducer;
-                    }
-                } catch (Throwable ex) {
-                    LOG.error("create new producer failed", ex);
-                }
-            }
-            // create producer failed
-            if (producer == null) {
-                sinkContext.processSendFail(event, producerTopic, 0);
-                return false;
-            }
-            // headers
-            Map<String, String> headers = this.encodeCacheMessageHeaders(event);
-            // compress
-            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
-            // sendAsync
-            long sendTime = System.currentTimeMillis();
-            CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
-                    .value(bodyBytes).sendAsync();
-            // callback
-            future.whenCompleteAsync((msgId, ex) -> {
-                if (ex != null) {
-                    LOG.error("Send fail:{}", ex.getMessage());
-                    LOG.error(ex.getMessage(), ex);
-                    sinkContext.processSendFail(event, producerTopic, sendTime);
-                } else {
-                    sinkContext.addSendResultMetric(event, producerTopic, true, sendTime);
-                    event.ack();
-                }
-            });
-            return true;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            sinkContext.processSendFail(event, event.getUid(), 0);
-            return false;
-        }
-    }
-
-    /**
-     * getProducerTopic
-     * 
-     * @param event DispatchProfile
-     * @return String Full topic name
-     */
-    private String getProducerTopic(DispatchProfile event) {
-        String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
-        if (baseTopic == null) {
-            return null;
-        }
-        StringBuilder builder = new StringBuilder();
-        if (tenant != null) {
-            builder.append(tenant).append("/");
-        }
-        if (namespace != null) {
-            builder.append(namespace).append("/");
-        }
-        builder.append(baseTopic);
-        return builder.toString();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java
deleted file mode 100644
index b2fb7819e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class PulsarZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
-    public PulsarZoneProducer(String workerName, AbstractZoneSinkContext context) {
-        super(workerName, context);
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-        super.reload(this);
-    }
-
-    @Override
-    public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
-            AbstractZoneSinkContext context) {
-        return new PulsarClusterProducer(workerName, config, (PulsarZoneSinkContext) context);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java
deleted file mode 100644
index 5052d0c47..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PulsarZoneSink
- */
-public class PulsarZoneSink extends AbstractZoneSink {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneSink.class);
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        try {
-            super.context = new PulsarZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
-            super.start((sinkName, workIndex, context) -> new PulsarZoneWorker(sinkName, workIndex,
-                    (PulsarZoneSinkContext) context));
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        super.start();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java
deleted file mode 100644
index 97db6dcac..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * 
- * PulsarZoneSinkContext
- */
-public class PulsarZoneSinkContext extends AbstractZoneSinkContext {
-
-    /**
-     * Constructor
-     * 
-     * @param context
-     */
-    public PulsarZoneSinkContext(String sinkName, Context context, Channel channel,
-            ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
-        super(sinkName, context, channel, dispatchQueues);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java
deleted file mode 100644
index 8332b3b50..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PulsarZoneWorker
- */
-public class PulsarZoneWorker extends AbstactZoneWorker {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneWorker.class);
-
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
-    public PulsarZoneWorker(String sinkName, int workerIndex, PulsarZoneSinkContext context) {
-        super(sinkName, workerIndex, context,
-                new PulsarZoneProducer(sinkName + "-worker-" + workerIndex, context));
-
-    }
-
-    /**
-     * run
-     */
-    @Override
-    public void run() {
-        LOG.info(String.format("start PulsarZoneWorker:%s", super.workerName));
-        super.run();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java
deleted file mode 100644
index 5ab814ff4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.flume.Context;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * TubeClusterProducer
- */
-public class TubeClusterProducer extends AbstractZoneClusterProducer {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TubeClusterProducer.class);
-    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
-
-    // parameter
-    private String masterHostAndPortList;
-    private long linkMaxAllowedDelayedMsgCount;
-    private long sessionWarnDelayedMsgCount;
-    private long sessionMaxAllowedDelayedMsgCount;
-    private long nettyWriteBufferHighWaterMark;
-    // tube producer
-    private TubeMultiSessionFactory sessionFactory;
-    private MessageProducer producer;
-    private Set<String> topicSet = new HashSet<>();
-
-    /**
-     * Constructor
-     * 
-     * @param workerName
-     * @param config
-     * @param context
-     */
-    public TubeClusterProducer(String workerName, CacheClusterConfig config, TubeZoneSinkContext context) {
-        super(workerName, config, context);
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        super.state = LifecycleState.START;
-        // create tube producer
-        try {
-            // prepare configuration
-            TubeClientConfig conf = initTubeConfig();
-            LOG.info("try to create producer:{}", conf.toJsonString());
-            this.sessionFactory = new TubeMultiSessionFactory(conf);
-            this.producer = sessionFactory.createProducer();
-            LOG.info("create new producer success:{}", producer);
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * initTubeConfig
-     * @return
-     *
-     * @throws Exception
-     */
-    private TubeClientConfig initTubeConfig() throws Exception {
-        // get parameter
-        Context configContext = new Context(this.producerContext.getParameters());
-        configContext.putAll(this.config.getParams());
-        masterHostAndPortList = configContext.getString(MASTER_HOST_PORT_LIST);
-        linkMaxAllowedDelayedMsgCount = configContext.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
-                80000L);
-        sessionWarnDelayedMsgCount = configContext.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
-                2000000L);
-        sessionMaxAllowedDelayedMsgCount = configContext.getLong(
-                ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
-                4000000L);
-        nettyWriteBufferHighWaterMark = configContext.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
-                15 * 1024 * 1024L);
-        // config
-        final TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
-        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
-        tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
-        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
-        tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
-        tubeClientConfig.setHeartbeatPeriodMs(15000L);
-        tubeClientConfig.setRpcTimeoutMs(20000L);
-
-        return tubeClientConfig;
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        super.state = LifecycleState.STOP;
-        // producer
-        if (this.producer != null) {
-            try {
-                this.producer.shutdown();
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        if (this.sessionFactory != null) {
-            try {
-                this.sessionFactory.shutdown();
-            } catch (TubeClientException e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-    }
-
-    /**
-     * send
-     *
-     * @param event
-     */
-    @Override
-    public boolean send(DispatchProfile event) {
-        try {
-            // topic
-            String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
-            if (topic == null) {
-                sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
-                return false;
-            }
-            // publish
-            if (!this.topicSet.contains(topic)) {
-                this.producer.publish(topic);
-                this.topicSet.add(topic);
-            }
-            // create producer failed
-            if (producer == null) {
-                sinkContext.processSendFail(event, topic, 0);
-                return false;
-            }
-            // headers
-            Map<String, String> headers = this.encodeCacheMessageHeaders(event);
-            // compress
-            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
-            // sendAsync
-            Message message = new Message(topic, bodyBytes);
-            // add headers
-            headers.forEach((key, value) -> {
-                message.setAttrKeyVal(key, value);
-            });
-            // callback
-            long sendTime = System.currentTimeMillis();
-            MessageSentCallback callback = new MessageSentCallback() {
-
-                @Override
-                public void onMessageSent(MessageSentResult result) {
-                    sinkContext.addSendResultMetric(event, topic, true, sendTime);
-                    event.ack();
-                }
-
-                @Override
-                public void onException(Throwable ex) {
-                    LOG.error("Send fail:{}", ex.getMessage());
-                    LOG.error(ex.getMessage(), ex);
-                    sinkContext.processSendFail(event, topic, sendTime);
-                }
-            };
-            producer.sendMessage(message, callback);
-            return true;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            sinkContext.processSendFail(event, event.getUid(), 0);
-            return false;
-        }
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java
deleted file mode 100644
index 214bb84c0..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class TubeZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
-    /**
-     * Constructor
-     * 
-     * @param workerName
-     * @param context
-     */
-
-    public TubeZoneProducer(String workerName, TubeZoneSinkContext context) {
-        super(workerName, context);
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-        super.reload(this);
-    }
-
-    @Override
-    public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
-            AbstractZoneSinkContext context) {
-        return new TubeClusterProducer(workerName, config, (TubeZoneSinkContext) context);
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java
deleted file mode 100644
index 764db484e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * TubeZoneSink
- */
-public class TubeZoneSink extends AbstractZoneSink {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TubeZoneSink.class);
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        try {
-            super.context = new TubeZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
-            super.start((sinkName, workIndex, context) -> {
-                return new TubeZoneWorker(sinkName, workIndex, (TubeZoneSinkContext) context);
-            });
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        super.start();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java
deleted file mode 100644
index 68946cc40..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * 
- * TubeZoneSinkContext
- */
-public class TubeZoneSinkContext extends AbstractZoneSinkContext {
-
-    /**
-     * Constructor
-     * 
-     * @param context
-     */
-    public TubeZoneSinkContext(String sinkName, Context context, Channel channel,
-            ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
-        super(sinkName, context, channel, dispatchQueues);
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java
deleted file mode 100644
index 70d1eede9..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * TubeZoneWorker
- */
-public class TubeZoneWorker extends AbstactZoneWorker {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TubeZoneWorker.class);
-
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
-    public TubeZoneWorker(String sinkName, int workerIndex, TubeZoneSinkContext context) {
-        super(sinkName, workerIndex, context,
-                new TubeZoneProducer(sinkName + "-worker-" + workerIndex, context));
-    }
-
-    /**
-     * run
-     */
-    @Override
-    public void run() {
-        LOG.info(String.format("start TubeZoneWorker:%s", super.workerName));
-        super.run();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java
deleted file mode 100644
index d2c6c67c6..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-public interface CreatePulsarClientCallBack {
-
-    void handleCreateClientSuccess(String url);
-
-    void handleCreateClientException(String url);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
deleted file mode 100644
index d9912273b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.sink.PulsarSink;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class PulsarClientService {
-
-    private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
-    private MQClusterConfig pulsarConfig;
-    public Map<String, List<TopicProducerInfo>> producerInfoMap;
-    public Map<String, AtomicLong> topicSendIndexMap;
-    public Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
-    public int pulsarClientIoThreads;
-    public int pulsarConnectionsPreBroker;
-    /*
-     * for pulsar client
-     */
-    private Map<String, String> pulsarUrl2token;
-    private String authType;
-    /*
-     * for producer
-     */
-    /*
-     * unit mills
-     */
-    private Integer sendTimeout;
-    private Integer clientTimeout;
-    private boolean enableBatch = true;
-    private boolean blockIfQueueFull = true;
-    private int maxPendingMessages = 10000;
-    private int maxPendingMessagesAcrossPartitions = 500000;
-    private CompressionType compressionType;
-    private int maxBatchingBytes = 128 * 1024;
-    private int maxBatchingMessages = 1000;
-    private long maxBatchingPublishDelayMillis = 1;
-    private long retryIntervalWhenSendMsgError = 30 * 1000L;
-    private int sinkThreadPoolSize;
-
-    /**
-     * PulsarClientService
-     *
-     * @param pulsarConfig
-     */
-    public PulsarClientService(MQClusterConfig pulsarConfig, int sinkThreadPoolSize) {
-        this.pulsarConfig = pulsarConfig;
-        this.sinkThreadPoolSize = sinkThreadPoolSize;
-
-        authType = pulsarConfig.getAuthType();
-        sendTimeout = pulsarConfig.getSendTimeoutMs();
-        retryIntervalWhenSendMsgError = pulsarConfig.getRetryIntervalWhenSendErrorMs();
-        clientTimeout = pulsarConfig.getClientTimeoutSecond();
-
-        Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
-
-        pulsarClientIoThreads = pulsarConfig.getPulsarClientIoThreads();
-        pulsarConnectionsPreBroker = pulsarConfig.getPulsarConnectionsPreBroker();
-
-        enableBatch = pulsarConfig.getEnableBatch();
-        blockIfQueueFull = pulsarConfig.getBlockIfQueueFull();
-        maxPendingMessages = pulsarConfig.getMaxPendingMessages();
-        maxPendingMessagesAcrossPartitions = pulsarConfig.getMaxPendingMessagesAcrossPartitions();
-        String compressionTypeStr = pulsarConfig.getCompressionType();
-        if (StringUtils.isNotEmpty(compressionTypeStr)) {
-            compressionType = CompressionType.valueOf(compressionTypeStr);
-        } else {
-            compressionType = CompressionType.NONE;
-        }
-        maxBatchingMessages = pulsarConfig.getMaxBatchingMessages();
-        maxBatchingBytes = pulsarConfig.getMaxBatchingBytes();
-        maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
-        producerInfoMap = new ConcurrentHashMap<>();
-        topicSendIndexMap = new ConcurrentHashMap<>();
-    }
-
-    public void initCreateConnection(CreatePulsarClientCallBack callBack, String sinkName) {
-        pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
-        if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
-            logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
-            return;
-        }
-        try {
-            createConnection(callBack);
-            if (!ConfigManager.getInstance().isMqClusterReady()) {
-                ConfigManager.getInstance().updMqClusterStatus(true);
-                logger.info("[{}] MQ Cluster service status ready!", sinkName);
-            }
-        } catch (FlumeException e) {
-            logger.error("unable to create pulsar client: ", e);
-            close();
-        }
-    }
-
-    /**
-     * send message
-     */
-    public boolean sendMessage(int poolIndex, String topic,
-            EventStat es, PulsarSink pulsarSink) {
-        boolean result;
-        TopicProducerInfo producerInfo = null;
-        Event event = es.getEvent();
-        final String pkgVersion =
-                event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
-        final String inlongGroupId =
-                event.getHeaders().get(AttributeConstants.GROUP_ID);
-        final String inlongStreamId =
-                event.getHeaders().get(AttributeConstants.STREAM_ID);
-        String errMsg = "";
-        try {
-            producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
-        } catch (Exception e) {
-            errMsg = "Get producer failed for topic=" + topic + ", reason is " + e.getMessage();
-        }
-        /*
-         * If the producer is a null value,\ it means that the topic is not yet ready, and it needs to be played back
-         * into the file channel
-         */
-        if (producerInfo == null) {
-            /*
-             * Data within 30s is placed in the exception channel to prevent frequent checks After 30s, reopen the topic
-             * check, if it is still a null value, put it back into the illegal map
-             */
-            pulsarSink.handleRequestProcError(topic, es,
-                    false, DataProxyErrCode.NO_AVAILABLE_PRODUCER, errMsg);
-            return false;
-        }
-        TopicProducerInfo forCallBackP = producerInfo;
-        Producer producer = producerInfo.getProducer(poolIndex);
-        if (producer == null) {
-            errMsg = "get producer is null! topic = " + topic;
-            pulsarSink.handleRequestProcError(topic, es,
-                    false, DataProxyErrCode.PRODUCER_IS_NULL, errMsg);
-            return false;
-        }
-        // build and send message
-        Map<String, String> proMap =
-                MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
-        long startTime = System.currentTimeMillis();
-        pulsarSink.getCurrentInFlightCount().incrementAndGet();
-        if (es.isOrderMessage()) {
-            try {
-                String partitionKey =
-                        event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
-                MessageId msgId = producer.newMessage()
-                        .properties(proMap)
-                        .key(partitionKey)
-                        .value(event.getBody())
-                        .send();
-                pulsarSink.handleMessageSendSuccess(topic, msgId, es, startTime);
-                forCallBackP.setCanUseSend(true);
-                result = true;
-            } catch (Throwable ex) {
-                forCallBackP.setCanUseSend(false);
-                pulsarSink.handleMessageSendException(topic, es, ex,
-                        DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
-                result = ex instanceof NotFoundException;
-            }
-        } else {
-            try {
-                producer.newMessage().properties(proMap)
-                        .value(event.getBody())
-                        .sendAsync()
-                        .thenAccept((msgId) -> {
-                            forCallBackP.setCanUseSend(true);
-                            pulsarSink.handleMessageSendSuccess(topic, msgId, es, startTime);
-                        })
-                        .exceptionally((e) -> {
-                            forCallBackP.setCanUseSend(false);
-                            pulsarSink.handleMessageSendException(topic, es, e,
-                                    DataProxyErrCode.MQ_RETURN_ERROR, e.toString());
-                            return null;
-                        });
-            } catch (Throwable ex) {
-                pulsarSink.getCurrentInFlightCount().decrementAndGet();
-                pulsarSink.handleRequestProcError(topic, es, true,
-                        DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
-            }
-            result = true;
-        }
-        return result;
-    }
-
-    /**
-     * If this function is called successively without calling {@see #destroyConnection()}, only the
-     * first call has any effect.
-     *
-     * @throws FlumeException if an RPC client connection could not be opened
-     */
-    private void createConnection(CreatePulsarClientCallBack callBack) throws FlumeException {
-        if (!pulsarClients.isEmpty()) {
-            return;
-        }
-        logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
-        for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
-            try {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("url = {}, token = {}", info.getKey(), info.getValue());
-                }
-                PulsarClient client = initPulsarClient(info.getKey(), info.getValue());
-                pulsarClients.put(info.getKey(), client);
-                callBack.handleCreateClientSuccess(info.getKey());
-            } catch (PulsarClientException e) {
-                callBack.handleCreateClientException(info.getKey());
-                logger.error("create connection error in Pulsar sink, "
-                        + "maybe pulsar master set error, please re-check. url " + info.getKey(), e);
-            } catch (Throwable e) {
-                callBack.handleCreateClientException(info.getKey());
-                logger.error("create connection error in pulsar sink, "
-                        + "maybe pulsar master set error/shutdown in progress, please "
-                        + "re-check. url " + info.getKey(), e);
-            }
-        }
-        if (pulsarClients.isEmpty()) {
-            throw new FlumeException("connect to pulsar error, maybe zkstr/zkroot set error, please re-check");
-        }
-    }
-
-    private PulsarClient initPulsarClient(String pulsarUrl, String token) throws Exception {
-        ClientBuilder builder = PulsarClient.builder();
-        if (MQClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
-            builder.authentication(AuthenticationFactory.token(token));
-        }
-        builder.serviceUrl(pulsarUrl)
-                .ioThreads(pulsarClientIoThreads)
-                .connectionsPerBroker(pulsarConnectionsPreBroker)
-                .connectionTimeout(clientTimeout, TimeUnit.SECONDS)
-                .statsInterval(pulsarConfig.getStatIntervalSec(),
-                        TimeUnit.SECONDS);
-        return builder.build();
-    }
-
-    /**
-     * Producer initialization.
-     */
-    public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
-            String inlongStreamId) {
-        List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
-            List<TopicProducerInfo> newList = new ArrayList<>();
-            for (PulsarClient pulsarClient : pulsarClients.values()) {
-                TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
-                        sinkThreadPoolSize, topic);
-                info.initProducer(inlongGroupId, inlongStreamId);
-                if (info.isCanUseToSendMessage()) {
-                    newList.add(info);
-                }
-            }
-            if (newList.size() == 0) {
-                newList = null;
-            }
-            return newList;
-        });
-        return producerInfoList;
-    }
-
-    public List<TopicProducerInfo> initTopicProducer(String topic) {
-        return initTopicProducer(topic, null, null);
-    }
-
-    public boolean destroyProducerByTopic(String topic) {
-        List<TopicProducerInfo> producerInfoList = producerInfoMap.remove(topic);
-        if (producerInfoList == null || producerInfoList.isEmpty()) {
-            return true;
-        }
-        for (TopicProducerInfo producerInfo : producerInfoList) {
-            if (producerInfo != null) {
-                producerInfo.close();
-                logger.info("destroy producer for topic={}", topic);
-            }
-        }
-        return true;
-    }
-
-    private TopicProducerInfo getProducerInfo(int poolIndex, String topic, String inlongGroupId,
-            String inlongStreamId) {
-        List<TopicProducerInfo> producerList = initTopicProducer(topic, inlongGroupId, inlongStreamId);
-        AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> new AtomicLong(0));
-        int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
-        if (maxTryToGetProducer == 0) {
-            return null;
-        }
-        int retryTime = 0;
-        TopicProducerInfo p;
-        do {
-            int index = (int) (topicIndex.getAndIncrement() % maxTryToGetProducer);
-            p = producerList.get(index);
-            if (p.isCanUseToSendMessage() && p.getProducer(poolIndex) != null
-                    && p.getProducer(poolIndex).isConnected()) {
-                break;
-            }
-            retryTime++;
-        } while (retryTime < maxTryToGetProducer);
-        return p;
-    }
-
-    public Map<String, List<TopicProducerInfo>> getProducerInfoMap() {
-        return producerInfoMap;
-    }
-
-    private void destroyConnection() {
-        producerInfoMap.clear();
-        for (PulsarClient pulsarClient : pulsarClients.values()) {
-            try {
-                pulsarClient.shutdown();
-            } catch (Exception e) {
-                logger.error("destroy pulsarClient error in PulsarSink: ", e);
-            }
-        }
-        pulsarClients.clear();
-        logger.debug("closed meta producer");
-    }
-
-    private void removeProducers(PulsarClient pulsarClient) {
-        for (List<TopicProducerInfo> producers : producerInfoMap.values()) {
-            if (producers == null || producers.isEmpty()) {
-                continue;
-            }
-            Iterator<TopicProducerInfo> it = producers.iterator();
-            while (it.hasNext()) {
-                TopicProducerInfo entry = it.next();
-                if (entry.getPulsarClient().equals(pulsarClient)) {
-                    entry.close();
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    /**
-     * close pulsarClients(the related url is removed); start pulsarClients for new url, and create producers for them
-     *
-     * @param callBack callback
-     * @param needToClose url-token map
-     * @param needToStart url-token map
-     * @param topicSet for new pulsarClient, create these topics' producers
-     */
-    public void updatePulsarClients(CreatePulsarClientCallBack callBack, Map<String, String> needToClose,
-            Map<String, String> needToStart, Set<String> topicSet) {
-        // close
-        for (String url : needToClose.keySet()) {
-            PulsarClient pulsarClient = pulsarClients.get(url);
-            if (pulsarClient != null) {
-                try {
-                    removeProducers(pulsarClient);
-                    pulsarClient.shutdown();
-                    pulsarClients.remove(url);
-                } catch (Exception e) {
-                    logger.error("shutdown pulsarClient error in PulsarSink: ", e);
-                }
-            }
-        }
-        for (Map.Entry<String, String> entry : needToStart.entrySet()) {
-            String url = entry.getKey();
-            String token = entry.getValue();
-            try {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("url = {}, token = {}", url, token);
-                }
-                PulsarClient client = initPulsarClient(url, token);
-                pulsarClients.put(url, client);
-                callBack.handleCreateClientSuccess(url);
-
-                // create related topicProducers
-                for (String topic : topicSet) {
-                    TopicProducerInfo info = new TopicProducerInfo(client, sinkThreadPoolSize,
-                            topic);
-                    info.initProducer();
-                    if (info.isCanUseToSendMessage()) {
-                        List<TopicProducerInfo> producerInfos = producerInfoMap.get(topic);
-                        if (producerInfos == null) {
-                            List<TopicProducerInfo> tmpProdInfos = new ArrayList<>();
-                            producerInfos = producerInfoMap.putIfAbsent(topic, tmpProdInfos);
-                            if (producerInfos == null) {
-                                producerInfos = tmpProdInfos;
-                            }
-                        }
-                        producerInfos.add(info);
-                    }
-                }
-            } catch (PulsarClientException e) {
-                callBack.handleCreateClientException(url);
-                logger.error("create connection error in pulsar sink, "
-                        + "maybe pulsar master set error, please re-check.url " + url, e);
-            } catch (Throwable e) {
-                callBack.handleCreateClientException(url);
-                logger.error("create connection error in pulsar sink, "
-                        + "maybe pulsar master set error/shutdown in progress, please "
-                        + "re-check. url " + url, e);
-            }
-        }
-    }
-
-    /**
-     * get inlong stream id from event
-     *
-     * @param event event
-     * @return inlong stream id
-     */
-    private String getInlongStreamId(Event event) {
-        String streamId = "";
-        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-            streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-            streamId = event.getHeaders().get(AttributeConstants.INAME);
-        }
-        return streamId;
-    }
-
-    /**
-     * get inlong group id from event
-     *
-     * @param event event
-     * @return inlong group id
-     */
-    private String getInlongGroupId(Event event) {
-        return event.getHeaders().get(AttributeConstants.GROUP_ID);
-    }
-
-    public void close() {
-        destroyConnection();
-    }
-
-    class TopicProducerInfo {
-
-        private final Producer[] producers;
-        private final PulsarClient pulsarClient;
-        private final int sinkThreadPoolSize;
-        private final String topic;
-        private long lastSendMsgErrorTime;
-        private volatile Boolean isCanUseSend = true;
-
-        private volatile Boolean isFinishInit = false;
-
-        public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize, String topic) {
-            this.pulsarClient = pulsarClient;
-            this.sinkThreadPoolSize = sinkThreadPoolSize;
-            this.topic = topic;
-            this.producers = new Producer[sinkThreadPoolSize];
-        }
-
-        public void initProducer() {
-            initProducer(null, null);
-        }
-
-        public void initProducer(String inlongGroupId, String inlongStreamId) {
-            try {
-                for (int i = 0; i < sinkThreadPoolSize; i++) {
-                    producers[i] = createProducer();
-                }
-                isFinishInit = true;
-            } catch (PulsarClientException e) {
-                logger.error("create pulsar client has error, topic = {}, inlongGroupId = {}, inlongStreamId= {}",
-                        topic, inlongGroupId, inlongStreamId, e);
-                isFinishInit = false;
-                for (int i = 0; i < sinkThreadPoolSize; i++) {
-                    if (producers[i] != null) {
-                        producers[i].closeAsync();
-                    }
-                }
-            }
-        }
-
-        private Producer createProducer() throws PulsarClientException {
-            return pulsarClient.newProducer().sendTimeout(sendTimeout, TimeUnit.MILLISECONDS)
-                    .topic(topic)
-                    .enableBatching(enableBatch)
-                    .blockIfQueueFull(blockIfQueueFull)
-                    .maxPendingMessages(maxPendingMessages)
-                    .maxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions)
-                    .compressionType(compressionType)
-                    .batchingMaxMessages(maxBatchingMessages)
-                    .batchingMaxBytes(maxBatchingBytes)
-                    .batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
-                    .create();
-        }
-
-        public void setCanUseSend(Boolean isCanUseSend) {
-            this.isCanUseSend = isCanUseSend;
-            if (!isCanUseSend) {
-                lastSendMsgErrorTime = System.currentTimeMillis();
-            }
-        }
-
-        public boolean isCanUseToSendMessage() {
-            if (isCanUseSend && isFinishInit) {
-                return true;
-            } else if (isFinishInit
-                    && (System.currentTimeMillis() - lastSendMsgErrorTime) > retryIntervalWhenSendMsgError) {
-                lastSendMsgErrorTime = System.currentTimeMillis();
-                return true;
-            }
-            return false;
-        }
-
-        public void close() {
-            try {
-                for (int i = 0; i < sinkThreadPoolSize; i++) {
-                    if (producers[i] != null) {
-                        producers[i].close();
-                    }
-                }
-            } catch (PulsarClientException e) {
-                logger.error("close pulsar producer has error: ", e);
-            }
-        }
-
-        public Producer getProducer(int poolIndex) {
-            if (poolIndex >= sinkThreadPoolSize || producers[poolIndex] == null) {
-                return producers[0];
-            }
-            return producers[poolIndex];
-        }
-
-        public PulsarClient getPulsarClient() {
-            return pulsarClient;
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
deleted file mode 100644
index bd1d9cd29..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.dataproxy.sink.EventStat;
-
-public interface SendMessageCallBack {
-
-    void handleMessageSendSuccess(String topic, Object msgId, EventStat es, long startTime);
-
-    void handleRequestProcError(String topic, EventStat es,
-            boolean needRetry, DataProxyErrCode errCode, String errMsg);
-
-    void handleMessageSendException(String topic, EventStat es, Object exception,
-            DataProxyErrCode errCode, String errMsg);
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
deleted file mode 100644
index c97e45c70..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import com.google.common.cache.LoadingCache;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Event;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.sink.PulsarSink;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SinkTask extends Thread {
-
-    private static final Logger logger = LoggerFactory.getLogger(SinkTask.class);
-
-    private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000);
-
-    /*
-     * default value
-     */
-    private static int BATCH_SIZE = 10000;
-
-    private PulsarClientService pulsarClientService;
-
-    private PulsarSink pulsarSink;
-
-    private long logCounter = 0;
-
-    private int poolIndex = 0;
-
-    private LinkedBlockingQueue<EventStat> eventQueue;
-
-    private LinkedBlockingQueue<EventStat> resendQueue;
-
-    private AtomicLong currentInFlightCount;
-
-    private SinkCounter sinkCounter;
-
-    private LoadingCache<String, Long> agentIdCache;
-
-    private MQClusterConfig pulsarConfig;
-
-    private int maxRetrySendCnt;
-    /*
-     * whether the SendTask thread can send data to pulsar
-     */
-    private volatile boolean canSend = false;
-
-    public SinkTask(PulsarClientService pulsarClientService, PulsarSink pulsarSink,
-            int eventQueueSize,
-            int badEventQueueSize, int poolIndex, boolean canSend) {
-        this.pulsarClientService = pulsarClientService;
-        this.pulsarSink = pulsarSink;
-        this.poolIndex = poolIndex;
-        this.canSend = canSend;
-        this.currentInFlightCount = pulsarSink.getCurrentInFlightCount();
-        this.sinkCounter = pulsarSink.getSinkCounter();
-        this.agentIdCache = pulsarSink.getAgentIdCache();
-        this.pulsarConfig = pulsarSink.getPulsarConfig();
-        this.maxRetrySendCnt = pulsarSink.getMaxRetrySendCnt();
-        eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
-        resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
-    }
-
-    public boolean processEvent(EventStat eventStat) {
-        try {
-            return eventQueue.offer(eventStat, 3 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("InterruptedException e", e);
-        }
-        return false;
-    }
-
-    public boolean processReSendEvent(EventStat eventStat) {
-        return resendQueue.offer(eventStat);
-    }
-
-    public boolean isAllSendFinished() {
-        return eventQueue.size() == 0;
-    }
-
-    public void close() {
-        canSend = false;
-    }
-
-    @Override
-    public void run() {
-        logger.info("Sink task {} started.", Thread.currentThread().getName());
-        while (canSend) {
-            Event event = null;
-            EventStat eventStat = null;
-            String topic = null;
-            try {
-                if (!resendQueue.isEmpty()) {
-                    /*
-                     * Send the data in the retry queue first
-                     */
-                    eventStat = resendQueue.poll();
-                    if (eventStat != null) {
-                        event = eventStat.getEvent();
-                    }
-                } else {
-                    if (currentInFlightCount.get() > BATCH_SIZE) {
-                        /*
-                         * Under the condition that the number of unresponsive messages is greater than 1w, the number
-                         * of unresponsive messages sent to pulsar will be printed periodically
-                         */
-                        logCounter++;
-                        if (logCounter == 1 || logCounter % 100000 == 0) {
-                            logger.info(getName()
-                                    + " currentInFlightCount={} resendQueue"
-                                    + ".size={}",
-                                    currentInFlightCount.get(), resendQueue.size());
-                        }
-                        if (logCounter > Long.MAX_VALUE - 10) {
-                            logCounter = 0;
-                        }
-                    }
-                    eventStat = eventQueue.take();
-                    sinkCounter.incrementEventDrainAttemptCount();
-                    event = eventStat.getEvent();
-                }
-                // check event status
-                if (event == null) {
-                    logger.warn("Event is null!");
-                    continue;
-                }
-                // check whether discard or send event
-                if (eventStat.getRetryCnt() > maxRetrySendCnt) {
-                    logger.warn("Message will be discard! send times reach to max retry cnt."
-                            + " max retry cnt = {}", maxRetrySendCnt);
-                    continue;
-                }
-                // get topic
-                topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
-                if (StringUtils.isEmpty(topic)) {
-                    String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID);
-                    String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-                    topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId);
-                }
-                if (topic == null || topic.equals("")) {
-                    pulsarSink.handleRequestProcError(topic, eventStat,
-                            false, DataProxyErrCode.TOPIC_IS_BLANK, "");
-                    continue;
-                }
-                // check whether duplicated event
-                String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
-                if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
-                    boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId);
-                    agentIdCache.put(clientSeqId, System.currentTimeMillis());
-                    if (hasSend) {
-                        pulsarSink.handleRequestProcError(topic, eventStat,
-                                false, DataProxyErrCode.DUPLICATED_MESSAGE,
-                                "Duplicated message by uuid = " + clientSeqId);
-                        if (logPrinterA.shouldPrint()) {
-                            logger.info("{} agent package {} existed,just discard.",
-                                    getName(), clientSeqId);
-                        }
-                        continue;
-                    }
-                }
-                // send message
-                pulsarClientService.sendMessage(poolIndex, topic, eventStat, pulsarSink);
-            } catch (InterruptedException e) {
-                logger.error("Thread {} has been interrupted!",
-                        Thread.currentThread().getName());
-                return;
-            } catch (Throwable t) {
-                if (t instanceof PulsarClientException) {
-                    String message = t.getMessage();
-                    if (message != null && (message.contains("No available queue for topic")
-                            || message.contains("The brokers of topic are all forbidden"))) {
-                        logger.info("IllegalTopicMap.put " + topic);
-                        continue;
-                    } else {
-                        try {
-                            /*
-                             * The exception of pulsar will cause the sending thread to block and prevent further
-                             * pressure on pulsar. Here you should pay attention to the type of exception to prevent the
-                             * error of a topic from affecting the global
-                             */
-                            Thread.sleep(100);
-                        } catch (InterruptedException e) {
-                            // ignore..
-                        }
-                    }
-                }
-                if (logPrinterA.shouldPrint()) {
-                    logger.error("Sink task fail to send the message, sink.name="
-                            + Thread.currentThread().getName()
-                            + ",event.headers="
-                            + eventStat.getEvent().getHeaders(), t);
-                }
-                /*
-                 * producer.sendMessage is abnormal, so currentInFlightCount is not added, so there is no need to
-                 * subtract
-                 */
-                pulsarSink.handleRequestProcError(topic, eventStat, false,
-                        DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, t.getMessage());
-            }
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
deleted file mode 100644
index 317da28f5..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- * PulsarSetSink
- */
-public class PulsarFederationSink extends AbstractSink implements Configurable {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
-
-    private PulsarFederationSinkContext context;
-    private List<PulsarFederationWorker> workers = new ArrayList<>();
-    private Map<String, String> dimensions;
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        String sinkName = this.getName();
-        // create worker
-        for (int i = 0; i < context.getMaxThreads(); i++) {
-            PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
-            worker.start();
-            this.workers.add(worker);
-        }
-        super.start();
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        for (PulsarFederationWorker worker : workers) {
-            try {
-                worker.close();
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        this.context.close();
-        super.stop();
-    }
-
-    /**
-     * configure
-     * 
-     * @param context
-     */
-    @Override
-    public void configure(Context context) {
-        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
-        this.context = new PulsarFederationSinkContext(this.getName(), context);
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
-        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-    }
-
-    /**
-     * process
-     * 
-     * @return                        Status
-     * @throws EventDeliveryException
-     */
-    @Override
-    public Status process() throws EventDeliveryException {
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event == null) {
-                tx.commit();
-                return Status.BACKOFF;
-            }
-            //
-            int eventSize = event.getBody().length;
-            if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
-                // record the failure of queue full for monitor
-                // metric
-                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
-                metricItem.readFailCount.incrementAndGet();
-                metricItem.readFailSize.addAndGet(eventSize);
-                //
-                tx.rollback();
-                return Status.BACKOFF;
-            }
-            this.context.getBufferQueue().offer(event);
-            tx.commit();
-            return Status.READY;
-        } catch (Throwable t) {
-            LOG.error("Process event failed!" + this.getName(), t);
-            try {
-                tx.rollback();
-                // metric
-                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
-                metricItem.readFailCount.incrementAndGet();
-            } catch (Throwable e) {
-                LOG.error("Channel take transaction rollback exception:" + getName(), e);
-            }
-            return Status.BACKOFF;
-        } finally {
-            tx.close();
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
deleted file mode 100644
index f57e4bbe0..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.Map;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
-
-/**
- * 
- * PulsarFederationContext
- */
-public class PulsarFederationSinkContext {
-
-    public static final String KEY_MAX_THREADS = "max-threads";
-    public static final String KEY_MAXTRANSACTION = "maxTransaction";
-    public static final String KEY_PROCESSINTERVAL = "processInterval";
-    public static final String KEY_RELOADINTERVAL = "reloadInterval";
-    public static final String KEY_MAXBUFFERQUEUESIZE = "maxBufferQueueSize";
-    public static final String PREFIX_PRODUCER = "producer.";
-
-    private final String proxyClusterId;
-    private final Context sinkContext;
-    private final Context producerContext;
-    //
-    private final IdTopicConfigHolder idTopicHolder;
-    private final CacheClusterConfigHolder cacheHolder;
-    private final BufferQueue<Event> bufferQueue;
-    //
-    private final int maxThreads;
-    private final int maxTransaction;
-    private final long processInterval;
-    private final long reloadInterval;
-    //
-    private final DataProxyMetricItemSet metricItemSet;
-
-    /**
-     * Constructor
-     * 
-     * @param context
-     */
-    public PulsarFederationSinkContext(String sinkName, Context context) {
-        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
-        this.sinkContext = context;
-        this.maxThreads = context.getInteger(KEY_MAX_THREADS, 10);
-        this.maxTransaction = context.getInteger(KEY_MAXTRANSACTION, 1);
-        this.processInterval = context.getInteger(KEY_PROCESSINTERVAL, 100);
-        this.reloadInterval = context.getLong(KEY_RELOADINTERVAL, 60000L);
-        //
-        this.idTopicHolder = new IdTopicConfigHolder();
-        this.idTopicHolder.configure(context);
-        this.idTopicHolder.start();
-        //
-        this.cacheHolder = new CacheClusterConfigHolder();
-        this.cacheHolder.configure(context);
-        this.cacheHolder.start();
-        //
-        int maxBufferQueueSize = context.getInteger(KEY_MAXBUFFERQUEUESIZE, 128 * 1024);
-        this.bufferQueue = new BufferQueue<Event>(maxBufferQueueSize);
-        //
-        Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
-        this.producerContext = new Context(producerParams);
-        //
-        this.metricItemSet = new DataProxyMetricItemSet(sinkName);
-        MetricRegister.register(this.metricItemSet);
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        this.idTopicHolder.close();
-        this.cacheHolder.close();
-    }
-
-    /**
-     * get proxyClusterId
-     * 
-     * @return the proxyClusterId
-     */
-    public String getProxyClusterId() {
-        return proxyClusterId;
-    }
-
-    /**
-     * get sinkContext
-     * 
-     * @return the sinkContext
-     */
-    public Context getSinkContext() {
-        return sinkContext;
-    }
-
-    /**
-     * get producerContext
-     * 
-     * @return the producerContext
-     */
-    public Context getProducerContext() {
-        return producerContext;
-    }
-
-    /**
-     * get idTopicHolder
-     * 
-     * @return the idTopicHolder
-     */
-    public IdTopicConfigHolder getIdTopicHolder() {
-        return idTopicHolder;
-    }
-
-    /**
-     * get cacheHolder
-     * 
-     * @return the cacheHolder
-     */
-    public CacheClusterConfigHolder getCacheHolder() {
-        return cacheHolder;
-    }
-
-    /**
-     * get bufferQueue
-     * 
-     * @return the bufferQueue
-     */
-    public BufferQueue<Event> getBufferQueue() {
-        return bufferQueue;
-    }
-
-    /**
-     * get maxThreads
-     * 
-     * @return the maxThreads
-     */
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    /**
-     * get maxTransaction
-     * 
-     * @return the maxTransaction
-     */
-    public int getMaxTransaction() {
-        return maxTransaction;
-    }
-
-    /**
-     * get processInterval
-     * 
-     * @return the processInterval
-     */
-    public long getProcessInterval() {
-        return processInterval;
-    }
-
-    /**
-     * get reloadInterval
-     * 
-     * @return the reloadInterval
-     */
-    public long getReloadInterval() {
-        return reloadInterval;
-    }
-
-    /**
-     * get metricItemSet
-     * 
-     * @return the metricItemSet
-     */
-    public DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
deleted file mode 100644
index b8070176a..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Event;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- * PulsarSetWorker
- */
-public class PulsarFederationWorker extends Thread {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationWorker.class);
-
-    private final String workerName;
-    private final PulsarFederationSinkContext context;
-
-    private PulsarProducerFederation producerFederation;
-    private LifecycleState status;
-    private Map<String, String> dimensions;
-
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
-    public PulsarFederationWorker(String sinkName, int workerIndex, PulsarFederationSinkContext context) {
-        super();
-        this.workerName = sinkName + "-worker-" + workerIndex;
-        this.context = context;
-        this.producerFederation = new PulsarProducerFederation(workerName, this.context);
-        this.status = LifecycleState.IDLE;
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
-        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, sinkName);
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        this.producerFederation.start();
-        this.status = LifecycleState.START;
-        super.start();
-    }
-
-    /**
-     * 
-     * close
-     */
-    public void close() {
-        // close all producers
-        this.producerFederation.close();
-        this.status = LifecycleState.STOP;
-    }
-
-    /**
-     * run
-     */
-    @Override
-    public void run() {
-        LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
-        while (status != LifecycleState.STOP) {
-            try {
-                Event currentRecord = context.getBufferQueue().pollRecord();
-                if (currentRecord == null) {
-                    Thread.sleep(context.getProcessInterval());
-                    continue;
-                }
-                // fill topic
-                this.fillTopic(currentRecord);
-                // metric
-                DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
-                this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                        currentRecord.getHeaders().get(Constants.TOPIC));
-                long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                        System.currentTimeMillis());
-                long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
-                dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
-                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
-                metricItem.sendCount.incrementAndGet();
-                metricItem.sendSize.addAndGet(currentRecord.getBody().length);
-                // send
-                this.producerFederation.send(currentRecord);
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-                this.sleepOneInterval();
-            }
-        }
-    }
-
-    /**
-     * fillTopic
-     * 
-     * @param currentRecord
-     */
-    private void fillTopic(Event currentRecord) {
-        Map<String, String> headers = currentRecord.getHeaders();
-        String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
-        String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
-        String uid = IdTopicConfig.generateUid(inlongGroupId, inlongStreamId);
-        String topic = this.context.getIdTopicHolder().getTopic(uid);
-        if (!StringUtils.isBlank(topic)) {
-            headers.put(Constants.TOPIC, topic);
-        }
-    }
-
-    /**
-     * sleepOneInterval
-     */
-    private void sleepOneInterval() {
-        try {
-            Thread.sleep(context.getProcessInterval());
-        } catch (InterruptedException e1) {
-            LOG.error(e1.getMessage(), e1);
-        }
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
deleted file mode 100644
index 8175f7bc9..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.security.SecureRandom;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- * PulsarProducerCluster
- */
-public class PulsarProducerCluster implements LifecycleAware {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerCluster.class);
-
-    public static final String KEY_SERVICE_URL = "serviceUrl";
-    public static final String KEY_AUTHENTICATION = "authentication";
-    public static final String KEY_STATS_INTERVAL_SECONDS = "statsIntervalSeconds";
-
-    public static final String KEY_ENABLEBATCHING = "enableBatching";
-    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
-    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
-    public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
-    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
-    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
-    public static final String KEY_SENDTIMEOUT = "sendTimeout";
-    public static final String KEY_COMPRESSIONTYPE = "compressionType";
-    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
-    public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
-            + "BatchingPartitionSwitchFrequency";
-
-    public static final String KEY_IOTHREADS = "ioThreads";
-    public static final String KEY_MEMORYLIMIT = "memoryLimit";
-    public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
-
-    private final String workerName;
-    private final CacheClusterConfig config;
-    private final PulsarFederationSinkContext sinkContext;
-    private final Context context;
-    private final String cacheClusterName;
-    private LifecycleState state;
-
-    /**
-     * pulsar client
-     */
-    private PulsarClient client;
-    private ProducerBuilder<byte[]> baseBuilder;
-
-    private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
-
-    /**
-     * Constructor
-     * 
-     * @param workerName
-     * @param config
-     * @param context
-     */
-    public PulsarProducerCluster(String workerName, CacheClusterConfig config, PulsarFederationSinkContext context) {
-        this.workerName = workerName;
-        this.config = config;
-        this.sinkContext = context;
-        this.context = context.getProducerContext();
-        this.state = LifecycleState.IDLE;
-        this.cacheClusterName = config.getClusterName();
-    }
-
-    /**
-     * start
-     */
-    @Override
-    public void start() {
-        this.state = LifecycleState.START;
-        // create pulsar client
-        try {
-            String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
-            String authentication = config.getParams().get(KEY_AUTHENTICATION);
-            this.client = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    .authentication(AuthenticationFactory.token(authentication))
-                    .ioThreads(context.getInteger(KEY_IOTHREADS, 1))
-                    .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
-                    .connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
-                    .statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS), -1),
-                            TimeUnit.SECONDS)
-                    .build();
-            this.baseBuilder = client.newProducer();
-            // Map<String, Object> builderConf = new HashMap<>();
-            // builderConf.putAll(context.getParameters());
-            this.baseBuilder
-                    .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
-                    .maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
-                    .maxPendingMessagesAcrossPartitions(
-                            context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
-                    .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500));
-            this.baseBuilder
-                    .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
-                            TimeUnit.MILLISECONDS);
-            this.baseBuilder
-                    .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
-            this.baseBuilder
-                    .accessMode(ProducerAccessMode.Shared)
-                    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
-                    .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
-            this.baseBuilder
-                    .roundRobinRouterBatchingPartitionSwitchFrequency(
-                            context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
-                    .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
-                    .compressionType(this.getPulsarCompressionType());
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * getPulsarCompressionType
-     * 
-     * @return CompressionType
-     */
-    private CompressionType getPulsarCompressionType() {
-        String type = this.context.getString(KEY_COMPRESSIONTYPE);
-        switch (type) {
-            case "LZ4":
-                return CompressionType.LZ4;
-            case "NONE":
-                return CompressionType.NONE;
-            case "ZLIB":
-                return CompressionType.ZLIB;
-            case "ZSTD":
-                return CompressionType.ZSTD;
-            case "SNAPPY":
-                return CompressionType.SNAPPY;
-            default:
-                return CompressionType.NONE;
-        }
-    }
-
-    /**
-     * stop
-     */
-    @Override
-    public void stop() {
-        this.state = LifecycleState.STOP;
-        //
-        for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
-            try {
-                entry.getValue().close();
-            } catch (PulsarClientException e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        try {
-            this.client.close();
-        } catch (PulsarClientException e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * getLifecycleState
-     * 
-     * @return
-     */
-    @Override
-    public LifecycleState getLifecycleState() {
-        return state;
-    }
-
-    /**
-     * send
-     * 
-     * @param event
-     */
-    public boolean send(Event event) {
-        // send
-        Map<String, String> headers = event.getHeaders();
-        String topic = headers.get(Constants.TOPIC);
-        // get producer
-        Producer<byte[]> producer = this.producerMap.get(topic);
-        if (producer == null) {
-            try {
-                LOG.info("try to new a object for topic " + topic);
-                SecureRandom secureRandom = new SecureRandom(
-                        (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis()).getBytes());
-                String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-" + secureRandom.nextLong();
-                producer = baseBuilder.clone().topic(topic)
-                        .producerName(producerName)
-                        .create();
-                LOG.info("create new producer success:{}", producer.getProducerName());
-                Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
-                if (oldProducer != null) {
-                    producer.close();
-                    LOG.info("close producer success:{}", producer.getProducerName());
-                    producer = oldProducer;
-                }
-            } catch (Throwable ex) {
-                LOG.error("create new producer failed", ex);
-            }
-        }
-        // create producer failed
-        if (producer == null) {
-            sinkContext.getBufferQueue().release(event.getBody().length);
-            this.addMetric(event, topic, false, 0);
-            return false;
-        }
-        // sendAsync
-        CompletableFuture<MessageId> future = null;
-        String messageKey = headers.get(Constants.MESSAGE_KEY);
-        long sendTime = System.currentTimeMillis();
-        if (messageKey == null) {
-            future = producer.newMessage().properties(headers)
-                    .value(event.getBody()).sendAsync();
-        } else {
-            future = producer.newMessage().key(messageKey).properties(headers)
-                    .value(event.getBody()).sendAsync();
-        }
-        // callback
-        future.whenCompleteAsync((msgId, ex) -> {
-            if (ex != null) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
-                sinkContext.getBufferQueue().offer(event);
-                this.addMetric(event, topic, false, 0);
-            } else {
-                sinkContext.getBufferQueue().release(event.getBody().length);
-                this.addMetric(event, topic, true, sendTime);
-            }
-        });
-        return true;
-    }
-
-    /**
-     * addMetric
-     * 
-     * @param event
-     * @param topic
-     * @param result
-     * @param sendTime
-     */
-    private void addMetric(Event event, String topic, boolean result, long sendTime) {
-        // metric
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.sinkContext.getProxyClusterId());
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
-        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
-        DataProxyMetricItem.fillInlongId(event, dimensions);
-        DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-        DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
-        if (result) {
-            metricItem.sendSuccessCount.incrementAndGet();
-            metricItem.sendSuccessSize.addAndGet(event.getBody().length);
-            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-            if (sendTime > 0) {
-                long msgTime = AuditUtils.getLogTime(event);
-                long currentTime = System.currentTimeMillis();
-                long sinkDuration = currentTime - sendTime;
-                long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
-                long wholeDuration = currentTime - msgTime;
-                metricItem.sinkDuration.addAndGet(sinkDuration);
-                metricItem.nodeDuration.addAndGet(nodeDuration);
-                metricItem.wholeDuration.addAndGet(wholeDuration);
-            }
-        } else {
-            metricItem.sendFailCount.incrementAndGet();
-            metricItem.sendFailSize.addAndGet(event.getBody().length);
-        }
-    }
-
-    /**
-     * get cacheClusterName
-     * 
-     * @return the cacheClusterName
-     */
-    public String getCacheClusterName() {
-        return cacheClusterName;
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
deleted file mode 100644
index 7bc5153bd..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- * PulsarProducerSet
- */
-public class PulsarProducerFederation {
-
-    public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerFederation.class);
-
-    private final String workerName;
-    private final PulsarFederationSinkContext context;
-    private Timer reloadTimer;
-
-    private List<PulsarProducerCluster> clusterList = new ArrayList<>();
-    private List<PulsarProducerCluster> deletingClusterList = new ArrayList<>();
-
-    private AtomicInteger clusterIndex = new AtomicInteger(0);
-
-    /**
-     * Constructor
-     * 
-     * @param workerName
-     * @param context
-     */
-    public PulsarProducerFederation(String workerName, PulsarFederationSinkContext context) {
-        this.workerName = workerName;
-        this.context = context;
-    }
-
-    /**
-     * start
-     */
-    public void start() {
-        try {
-            this.reload();
-            this.setReloadTimer();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        try {
-            this.reloadTimer.cancel();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        for (PulsarProducerCluster cluster : this.clusterList) {
-            cluster.stop();
-        }
-    }
-
-    /**
-     * setReloadTimer
-     */
-    private void setReloadTimer() {
-        reloadTimer = new Timer(true);
-        TimerTask task = new TimerTask() {
-
-            public void run() {
-                reload();
-            }
-        };
-        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
-                context.getReloadInterval());
-    }
-
-    /**
-     * reload
-     */
-    public void reload() {
-        try {
-            // stop deleted cluster
-            deletingClusterList.forEach(item -> {
-                item.stop();
-            });
-            deletingClusterList.clear();
-            // update cluster list
-            List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
-            List<PulsarProducerCluster> newClusterList = new ArrayList<>(configList.size());
-            // prepare
-            Set<String> newClusterNames = new HashSet<>();
-            configList.forEach(item -> {
-                newClusterNames.add(item.getClusterName());
-            });
-            Set<String> oldClusterNames = new HashSet<>();
-            clusterList.forEach(item -> {
-                oldClusterNames.add(item.getCacheClusterName());
-            });
-            // add
-            for (CacheClusterConfig config : configList) {
-                if (!oldClusterNames.contains(config.getClusterName())) {
-                    PulsarProducerCluster cluster = new PulsarProducerCluster(workerName, config, context);
-                    cluster.start();
-                    newClusterList.add(cluster);
-                }
-            }
-            // remove
-            for (PulsarProducerCluster cluster : this.clusterList) {
-                if (newClusterNames.contains(cluster.getCacheClusterName())) {
-                    newClusterList.add(cluster);
-                } else {
-                    deletingClusterList.add(cluster);
-                }
-            }
-            this.clusterList = newClusterList;
-        } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * send
-     * 
-     * @param event
-     */
-    public boolean send(Event event) {
-        int currentIndex = clusterIndex.getAndIncrement();
-        if (currentIndex > Integer.MAX_VALUE / 2) {
-            clusterIndex.set(0);
-        }
-        List<PulsarProducerCluster> currentClusterList = this.clusterList;
-        int currentSize = currentClusterList.size();
-        int realIndex = currentIndex % currentSize;
-        PulsarProducerCluster clusterProducer = currentClusterList.get(realIndex);
-        return clusterProducer.send(event);
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
deleted file mode 100644
index bf2bd258d..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Charsets;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestPulsarSink {
-
-    private MemoryChannel channel;
-
-    @Before
-    public void setUp() {
-        PulsarSink sink = new PulsarSink();
-        channel = new MemoryChannel();
-        Context context = new Context();
-        context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
-        sink.setChannel(channel);
-
-        this.channel.configure(context);
-    }
-
-    @Test
-    public void testProcess() {
-        Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        for (int i = 0; i < 10; i++) {
-            channel.put(event);
-        }
-        transaction.commit();
-        transaction.close();
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java
deleted file mode 100644
index 287bf0009..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Charsets;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestTubeSink {
-
-    private MemoryChannel channel;
-
-    @Before
-    public void setUp() {
-        TubeSink sink = new TubeSink();
-        channel = new MemoryChannel();
-        Context context = new Context();
-        context.put("type", "org.apache.inlong.dataproxy.sink.TubeSink");
-        sink.setChannel(channel);
-
-        this.channel.configure(context);
-    }
-
-    @Test
-    public void testProcess() {
-        Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        for (int i = 0; i < 10; i++) {
-            channel.put(event);
-        }
-        transaction.commit();
-        transaction.close();
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
deleted file mode 100644
index 9d617562b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
-import org.apache.inlong.dataproxy.utils.MockUtils;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * TestPulsarFederationSink
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
-@PrepareForTest({MetricRegister.class})
-public class TestPulsarFederationSink {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
-    public static Context context;
-    public static Context sinkContext;
-    public static PulsarFederationSink sinkObj;
-
-    /**
-     * setup
-     */
-    @BeforeClass
-    public static void setUp() {
-        Map<String, String> result = new ConcurrentHashMap<>();
-        try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource(
-                "dataproxy-pulsar.conf")
-                .openStream()) {
-            Properties props = new Properties();
-            props.load(inStream);
-            for (Map.Entry<Object, Object> entry : props.entrySet()) {
-                result.put((String) entry.getKey(), (String) entry.getValue());
-            }
-            context = new Context(result);
-            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
-        } catch (Exception e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
-        }
-        sinkObj = new PulsarFederationSink();
-        sinkObj.configure(sinkContext);
-    }
-
-    /**
-     * testResult
-     */
-    @Test
-    public void testResult() throws Exception {
-        MockUtils.mockMetricRegister();
-        // mock
-        Channel channel = MockUtils.mockChannel();
-        sinkObj.setChannel(channel);
-        sinkObj.process();
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
deleted file mode 100644
index b161218b4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.utils.MockUtils;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * TestPulsarProducerFederation
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
-@PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
-        Producer.class, ProducerBuilder.class, TypedMessageBuilder.class, MetricRegister.class})
-public class TestPulsarProducerFederation {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TestPulsarProducerFederation.class);
-    public static Context context;
-    public static Context sinkContext;
-
-    /**
-     * setup
-     */
-    @BeforeClass
-    public static void setUp() {
-        Map<String, String> result = new ConcurrentHashMap<>();
-        try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource(
-                "dataproxy-pulsar.conf").openStream()) {
-            MockUtils.mockMetricRegister();
-            Properties props = new Properties();
-            props.load(inStream);
-            for (Map.Entry<Object, Object> entry : props.entrySet()) {
-                result.put((String) entry.getKey(), (String) entry.getValue());
-            }
-            context = new Context(result);
-            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
-            MockUtils.mockPulsarClient();
-        } catch (Exception e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
-        }
-    }
-
-    /**
-     * testResult
-     */
-    @Test
-    public void testResult() throws Exception {
-        String workerName = "workerName";
-        PulsarFederationSinkContext pulsarContext = new PulsarFederationSinkContext(MockUtils.SINK_ID, sinkContext);
-        PulsarProducerFederation federation = new PulsarProducerFederation(workerName, pulsarContext);
-        federation.start();
-        Event event = MockUtils.mockEvent();
-        boolean result = federation.send(event);
-        assertTrue(result);
-    }
-
-}