You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/15 10:13:58 UTC

[incubator-inlong] branch master updated: [INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a21bb1b  [INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)
a21bb1b is described below

commit a21bb1b02ea7e3651fa734ee475a8bc762c067aa
Author: 卢春亮 <94...@qq.com>
AuthorDate: Tue Feb 15 18:13:49 2022 +0800

    [INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)
---
 .../inlong/dataproxy/dispatch/DispatchManager.java |   8 +-
 .../sink/pulsarzone/PulsarClusterProducer.java     | 317 +++++++++++++++++++++
 .../sink/pulsarzone/PulsarZoneProducer.java        | 164 +++++++++++
 .../dataproxy/sink/pulsarzone/PulsarZoneSink.java  | 157 ++++++++++
 .../sink/pulsarzone/PulsarZoneSinkContext.java     | 261 +++++++++++++++++
 .../sink/pulsarzone/PulsarZoneWorker.java          | 107 +++++++
 6 files changed, 1010 insertions(+), 4 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index 9e05100..fecae08 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -43,12 +43,12 @@ public class DispatchManager {
     public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
     public static final long MINUTE_MS = 60L * 1000;
 
-    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
     private final long dispatchTimeout;
     private final long maxPackCount;
     private final long maxPackSize;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
     private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
-    //
+    // flag that manager need to output overtime data.
     private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
 
     /**
@@ -78,14 +78,14 @@ public class DispatchManager {
         String eventUid = event.getUid();
         long dispatchTime = event.getMsgTime() - event.getMsgTime() % MINUTE_MS;
         String dispatchKey = eventUid + "." + dispatchTime;
-        //
+        // find dispatch profile
         DispatchProfile dispatchProfile = this.profileCache.get(dispatchKey);
         if (dispatchProfile == null) {
             dispatchProfile = new DispatchProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
                     dispatchTime);
             this.profileCache.put(dispatchKey, dispatchProfile);
         }
-        //
+        // add event
         boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
         if (!addResult) {
             DispatchProfile newDispatchProfile = new DispatchProfile(eventUid, event.getInlongGroupId(),
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
new file mode 100644
index 0000000..0a1e80c
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.commons.protocol.EventUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarClusterProducer
+ */
+public class PulsarClusterProducer implements LifecycleAware {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
+
+    public static final String KEY_SERVICE_URL = "serviceUrl";
+    public static final String KEY_AUTHENTICATION = "authentication";
+
+    public static final String KEY_ENABLEBATCHING = "enableBatching";
+    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
+    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
+    public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
+    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
+    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
+    public static final String KEY_SENDTIMEOUT = "sendTimeout";
+    public static final String KEY_COMPRESSIONTYPE = "compressionType";
+    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
+    public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+            + "BatchingPartitionSwitchFrequency";
+
+    public static final String KEY_IOTHREADS = "ioThreads";
+    public static final String KEY_MEMORYLIMIT = "memoryLimit";
+    public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
+
+    private final String workerName;
+    private final CacheClusterConfig config;
+    private final PulsarZoneSinkContext sinkContext;
+    private final Context context;
+    private final String cacheClusterName;
+    private LifecycleState state;
+
+    /**
+     * pulsar client
+     */
+    private PulsarClient client;
+    private ProducerBuilder<byte[]> baseBuilder;
+
+    private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param workerName
+     * @param config
+     * @param context
+     */
+    public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
+        this.workerName = workerName;
+        this.config = config;
+        this.sinkContext = context;
+        this.context = context.getProducerContext();
+        this.state = LifecycleState.IDLE;
+        this.cacheClusterName = config.getClusterName();
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        this.state = LifecycleState.START;
+        // create pulsar client
+        try {
+            String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+            String authentication = config.getParams().get(KEY_AUTHENTICATION);
+            this.client = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .authentication(AuthenticationFactory.token(authentication))
+                    .ioThreads(context.getInteger(KEY_IOTHREADS, 1))
+                    .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
+                    .connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
+                    .build();
+            this.baseBuilder = client.newProducer();
+//            Map<String, Object> builderConf = new HashMap<>();
+//            builderConf.putAll(context.getParameters());
+            this.baseBuilder
+                    .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
+                    .maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
+                    .maxPendingMessagesAcrossPartitions(
+                            context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
+                    .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
+                    .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
+                            TimeUnit.MILLISECONDS)
+                    .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
+            this.baseBuilder
+                    .accessMode(ProducerAccessMode.Shared)
+                    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                    .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+                    .roundRobinRouterBatchingPartitionSwitchFrequency(
+                            context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
+                    .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
+                    .compressionType(this.getPulsarCompressionType());
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * getPulsarCompressionType
+     * 
+     * @return CompressionType
+     */
+    private CompressionType getPulsarCompressionType() {
+        String type = this.context.getString(KEY_COMPRESSIONTYPE);
+        switch (type) {
+            case "LZ4" :
+                return CompressionType.LZ4;
+            case "NONE" :
+                return CompressionType.NONE;
+            case "ZLIB" :
+                return CompressionType.ZLIB;
+            case "ZSTD" :
+                return CompressionType.ZSTD;
+            case "SNAPPY" :
+                return CompressionType.SNAPPY;
+            default :
+                return CompressionType.NONE;
+        }
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        this.state = LifecycleState.STOP;
+        //
+        for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (PulsarClientException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        try {
+            this.client.close();
+        } catch (PulsarClientException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * getLifecycleState
+     * 
+     * @return
+     */
+    @Override
+    public LifecycleState getLifecycleState() {
+        return state;
+    }
+
+    /**
+     * send
+     * 
+     * @param event
+     */
+    public boolean send(DispatchProfile event) {
+        try {
+            // topic
+            String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+            if (topic == null) {
+                sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+                return false;
+            }
+            // get producer
+            Producer<byte[]> producer = this.producerMap.get(topic);
+            if (producer == null) {
+                try {
+                    LOG.info("try to new a object for topic " + topic);
+                    SecureRandom secureRandom = new SecureRandom(
+                            (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis())
+                                    .getBytes());
+                    String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-"
+                            + secureRandom.nextLong();
+                    producer = baseBuilder.clone().topic(topic)
+                            .producerName(producerName)
+                            .create();
+                    LOG.info("create new producer success:{}", producer.getProducerName());
+                    Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+                    if (oldProducer != null) {
+                        producer.close();
+                        LOG.info("close producer success:{}", producer.getProducerName());
+                        producer = oldProducer;
+                    }
+                } catch (Throwable ex) {
+                    LOG.error("create new producer failed", ex);
+                }
+            }
+            // create producer failed
+            if (producer == null) {
+                sinkContext.getDispatchQueue().offer(event);
+                sinkContext.addSendResultMetric(event, topic, false, 0);
+                return false;
+            }
+            // headers
+            Map<String, String> headers = this.encodeCacheMessageHeaders(event);
+            // compress
+            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
+            // sendAsync
+            long sendTime = System.currentTimeMillis();
+            CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
+                    .value(bodyBytes).sendAsync();
+            // callback
+            future.whenCompleteAsync((msgId, ex) -> {
+                if (ex != null) {
+                    LOG.error("Send fail:{}", ex.getMessage());
+                    LOG.error(ex.getMessage(), ex);
+                    sinkContext.getDispatchQueue().offer(event);
+                    sinkContext.addSendResultMetric(event, topic, false, sendTime);
+                } else {
+                    sinkContext.addSendResultMetric(event, topic, true, sendTime);
+                }
+            });
+            return true;
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            sinkContext.getDispatchQueue().offer(event);
+            sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+            return false;
+        }
+    }
+
+    /**
+     * encodeCacheMessageHeaders
+     * 
+     * @param  event
+     * @return       Map
+     */
+    public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
+        Map<String, String> headers = new HashMap<>();
+        // version int32 protocol version, the value is 1
+        headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
+        // inlongGroupId string inlongGroupId
+        headers.put(EventConstants.INLONG_GROUP_ID, event.getInlongGroupId());
+        // inlongStreamId string inlongStreamId
+        headers.put(EventConstants.INLONG_STREAM_ID, event.getInlongStreamId());
+        // proxyName string proxy node id, IP or conainer name
+        headers.put(EventConstants.HEADER_KEY_PROXY_NAME, sinkContext.getNodeId());
+        // packTime int64 pack time, milliseconds
+        headers.put(EventConstants.HEADER_KEY_PACK_TIME, String.valueOf(System.currentTimeMillis()));
+        // msgCount int32 message count
+        headers.put(EventConstants.HEADER_KEY_MSG_COUNT, String.valueOf(event.getEvents().size()));
+        // srcLength int32 total length of raw messages body
+        headers.put(EventConstants.HEADER_KEY_SRC_LENGTH, String.valueOf(event.getSize()));
+        // compressType int
+        // compress type of body data
+        // INLONG_NO_COMPRESS = 0,
+        // INLONG_GZ = 1,
+        // INLONG_SNAPPY = 2
+        headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
+                String.valueOf(sinkContext.getCompressType().getNumber()));
+        // messageKey string partition hash key, optional
+        return headers;
+    }
+
+    /**
+     * get cacheClusterName
+     * 
+     * @return the cacheClusterName
+     */
+    public String getCacheClusterName() {
+        return cacheClusterName;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java
new file mode 100644
index 0000000..b000a45
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarZoneProducer
+ */
+public class PulsarZoneProducer {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneProducer.class);
+    public static final int MAX_INDEX = Integer.MAX_VALUE / 2;
+
+    private final String workerName;
+    private final PulsarZoneSinkContext context;
+    private Timer reloadTimer;
+
+    private List<PulsarClusterProducer> clusterList = new ArrayList<>();
+    private List<PulsarClusterProducer> deletingClusterList = new ArrayList<>();
+
+    private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     * 
+     * @param workerName
+     * @param context
+     */
+    public PulsarZoneProducer(String workerName, PulsarZoneSinkContext context) {
+        this.workerName = workerName;
+        this.context = context;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        for (PulsarClusterProducer cluster : this.clusterList) {
+            cluster.stop();
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
+                context.getReloadInterval());
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            // stop deleted cluster
+            deletingClusterList.forEach(item -> {
+                item.stop();
+            });
+            deletingClusterList.clear();
+            // update cluster list
+            List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
+            List<PulsarClusterProducer> newClusterList = new ArrayList<>(configList.size());
+            // prepare
+            Set<String> newClusterNames = new HashSet<>();
+            configList.forEach(item -> {
+                newClusterNames.add(item.getClusterName());
+            });
+            Set<String> oldClusterNames = new HashSet<>();
+            clusterList.forEach(item -> {
+                oldClusterNames.add(item.getCacheClusterName());
+            });
+            // add
+            for (CacheClusterConfig config : configList) {
+                if (!oldClusterNames.contains(config.getClusterName())) {
+                    PulsarClusterProducer cluster = new PulsarClusterProducer(workerName, config, context);
+                    cluster.start();
+                    newClusterList.add(cluster);
+                }
+            }
+            // remove
+            for (PulsarClusterProducer cluster : this.clusterList) {
+                if (newClusterNames.contains(cluster.getCacheClusterName())) {
+                    newClusterList.add(cluster);
+                } else {
+                    deletingClusterList.add(cluster);
+                }
+            }
+            this.clusterList = newClusterList;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * send
+     * 
+     * @param event
+     */
+    public boolean send(DispatchProfile event) {
+        int currentIndex = clusterIndex.getAndIncrement();
+        if (currentIndex > MAX_INDEX) {
+            clusterIndex.set(0);
+        }
+        List<PulsarClusterProducer> currentClusterList = this.clusterList;
+        int currentSize = currentClusterList.size();
+        int realIndex = currentIndex % currentSize;
+        PulsarClusterProducer clusterProducer = currentClusterList.get(realIndex);
+        return clusterProducer.send(event);
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
new file mode 100644
index 0000000..61aee51
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.dispatch.DispatchManager;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarZoneSink
+ */
+public class PulsarZoneSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneSink.class);
+
+    private Context parentContext;
+    private PulsarZoneSinkContext context;
+    private List<PulsarZoneWorker> workers = new ArrayList<>();
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // scheduled thread pool
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new PulsarZoneSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // create worker
+            for (int i = 0; i < context.getMaxThreads(); i++) {
+                PulsarZoneWorker worker = new PulsarZoneWorker(this.getName(), i, context);
+                worker.start();
+                this.workers.add(worker);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        for (PulsarZoneWorker worker : workers) {
+            try {
+                worker.close();
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        this.context.close();
+        super.stop();
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProxyEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProxyEvent proxyEvent = (ProxyEvent) event;
+            this.dispatchManager.addEvent(proxyEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
new file mode 100644
index 0000000..f867f1d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.sink.SinkContext;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+
+/**
+ * 
+ * PulsarZoneSinkContext
+ */
+public class PulsarZoneSinkContext extends SinkContext {
+
+    public static final String KEY_NODE_ID = "nodeId";
+    public static final String PREFIX_PRODUCER = "producer.";
+    public static final String KEY_COMPRESS_TYPE = "compressType";
+
+    private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+
+    private final String proxyClusterId;
+    private final String nodeId;
+    private final Context producerContext;
+    //
+    private final IdTopicConfigHolder idTopicHolder;
+    private final CacheClusterConfigHolder cacheHolder;
+    private final INLONG_COMPRESSED_TYPE compressType;
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PulsarZoneSinkContext(String sinkName, Context context, Channel channel,
+            LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+        super(sinkName, context, channel);
+        this.dispatchQueue = dispatchQueue;
+        // proxyClusterId
+        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        // nodeId
+        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
+        // compressionType
+        String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
+                INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
+        this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
+        // producerContext
+        Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
+        this.producerContext = new Context(producerParams);
+        // idTopicHolder
+        this.idTopicHolder = new IdTopicConfigHolder();
+        this.idTopicHolder.configure(context);
+        // cacheHolder
+        this.cacheHolder = new CacheClusterConfigHolder();
+        this.cacheHolder.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        super.start();
+        this.idTopicHolder.start();
+        this.cacheHolder.start();
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        super.close();
+        this.idTopicHolder.close();
+        this.cacheHolder.close();
+    }
+
+    /**
+     * get proxyClusterId
+     * 
+     * @return the proxyClusterId
+     */
+    public String getProxyClusterId() {
+        return proxyClusterId;
+    }
+
+    /**
+     * get dispatchQueue
+     * 
+     * @return the dispatchQueue
+     */
+    public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    /**
+     * get producerContext
+     * 
+     * @return the producerContext
+     */
+    public Context getProducerContext() {
+        return producerContext;
+    }
+
+    /**
+     * get idTopicHolder
+     * 
+     * @return the idTopicHolder
+     */
+    public IdTopicConfigHolder getIdTopicHolder() {
+        return idTopicHolder;
+    }
+
+    /**
+     * get cacheHolder
+     * 
+     * @return the cacheHolder
+     */
+    public CacheClusterConfigHolder getCacheHolder() {
+        return cacheHolder;
+    }
+
+    /**
+     * get compressType
+     * 
+     * @return the compressType
+     */
+    public INLONG_COMPRESSED_TYPE getCompressType() {
+        return compressType;
+    }
+
+    /**
+     * get nodeId
+     * 
+     * @return the nodeId
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    /**
+     * addSendMetric
+     * 
+     * @param currentRecord
+     * @param bid
+     */
+    public void addSendMetric(DispatchProfile currentRecord, String bid) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        // metric
+        fillInlongId(currentRecord, dimensions);
+        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
+        long msgTime = currentRecord.getDispatchTime();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        long count = currentRecord.getCount();
+        long size = currentRecord.getSize();
+        metricItem.sendCount.addAndGet(count);
+        metricItem.sendSize.addAndGet(size);
+    }
+
+    /**
+     * addReadFailMetric
+     */
+    public void addSendFailMetric() {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+        long msgTime = System.currentTimeMillis();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.readFailCount.incrementAndGet();
+    }
+
+    /**
+     * fillInlongId
+     * 
+     * @param currentRecord
+     * @param dimensions
+     */
+    public static void fillInlongId(DispatchProfile currentRecord, Map<String, String> dimensions) {
+        String inlongGroupId = currentRecord.getInlongGroupId();
+        inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId;
+        String inlongStreamId = currentRecord.getInlongStreamId();
+        inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : inlongStreamId;
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
+    }
+
+    /**
+     * addSendResultMetric
+     * 
+     * @param currentRecord
+     * @param bid
+     * @param result
+     * @param sendTime
+     */
+    public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        // metric
+        fillInlongId(currentRecord, dimensions);
+        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
+        long msgTime = currentRecord.getDispatchTime();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        long count = currentRecord.getCount();
+        long size = currentRecord.getSize();
+        if (result) {
+            metricItem.sendSuccessCount.addAndGet(count);
+            metricItem.sendSuccessSize.addAndGet(size);
+            currentRecord.getEvents().forEach((event) -> {
+                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
+            });
+            if (sendTime > 0) {
+                long currentTime = System.currentTimeMillis();
+                currentRecord.getEvents().forEach((event) -> {
+                    long sinkDuration = currentTime - sendTime;
+                    long nodeDuration = currentTime - event.getMsgTime();
+                    long wholeDuration = currentTime - msgTime;
+                    metricItem.sinkDuration.addAndGet(sinkDuration * count);
+                    metricItem.nodeDuration.addAndGet(nodeDuration * count);
+                    metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                });
+            }
+        } else {
+            metricItem.sendFailCount.addAndGet(count);
+            metricItem.sendFailSize.addAndGet(size);
+        }
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java
new file mode 100644
index 0000000..35e54fb
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarZoneWorker
+ */
+public class PulsarZoneWorker extends Thread {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneWorker.class);
+
+    private final String workerName;
+    private final PulsarZoneSinkContext context;
+
+    private PulsarZoneProducer zoneProducer;
+    private LifecycleState status;
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param workerIndex
+     * @param context
+     */
+    public PulsarZoneWorker(String sinkName, int workerIndex, PulsarZoneSinkContext context) {
+        super();
+        this.workerName = sinkName + "-worker-" + workerIndex;
+        this.context = context;
+        this.zoneProducer = new PulsarZoneProducer(workerName, this.context);
+        this.status = LifecycleState.IDLE;
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        this.zoneProducer.start();
+        this.status = LifecycleState.START;
+        super.start();
+    }
+
+    /**
+     * 
+     * close
+     */
+    public void close() {
+        // close all producers
+        this.zoneProducer.close();
+        this.status = LifecycleState.STOP;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info(String.format("start PulsarZoneWorker:%s", this.workerName));
+        while (status != LifecycleState.STOP) {
+            try {
+                DispatchProfile event = context.getDispatchQueue().poll();
+                if (event == null) {
+                    this.sleepOneInterval();
+                    continue;
+                }
+                // metric
+                context.addSendMetric(event, workerName);
+                // send
+                this.zoneProducer.send(event);
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+                this.sleepOneInterval();
+            }
+        }
+    }
+
+    /**
+     * sleepOneInterval
+     */
+    private void sleepOneInterval() {
+        try {
+            Thread.sleep(context.getProcessInterval());
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
+}