You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/15 10:13:58 UTC
[incubator-inlong] branch master updated: [INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a21bb1b [INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)
a21bb1b is described below
commit a21bb1b02ea7e3651fa734ee475a8bc762c067aa
Author: 卢春亮 <94...@qq.com>
AuthorDate: Tue Feb 15 18:13:49 2022 +0800
[INLONG-2379] DataProxy support Pulsar sink of PB compression cache message protocol. (#2502)
---
.../inlong/dataproxy/dispatch/DispatchManager.java | 8 +-
.../sink/pulsarzone/PulsarClusterProducer.java | 317 +++++++++++++++++++++
.../sink/pulsarzone/PulsarZoneProducer.java | 164 +++++++++++
.../dataproxy/sink/pulsarzone/PulsarZoneSink.java | 157 ++++++++++
.../sink/pulsarzone/PulsarZoneSinkContext.java | 261 +++++++++++++++++
.../sink/pulsarzone/PulsarZoneWorker.java | 107 +++++++
6 files changed, 1010 insertions(+), 4 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index 9e05100..fecae08 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -43,12 +43,12 @@ public class DispatchManager {
public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
public static final long MINUTE_MS = 60L * 1000;
- private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
private final long dispatchTimeout;
private final long maxPackCount;
private final long maxPackSize;
+ private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
- //
+ // flag that manager need to output overtime data.
private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
/**
@@ -78,14 +78,14 @@ public class DispatchManager {
String eventUid = event.getUid();
long dispatchTime = event.getMsgTime() - event.getMsgTime() % MINUTE_MS;
String dispatchKey = eventUid + "." + dispatchTime;
- //
+ // find dispatch profile
DispatchProfile dispatchProfile = this.profileCache.get(dispatchKey);
if (dispatchProfile == null) {
dispatchProfile = new DispatchProfile(eventUid, event.getInlongGroupId(), event.getInlongStreamId(),
dispatchTime);
this.profileCache.put(dispatchKey, dispatchProfile);
}
- //
+ // add event
boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
if (!addResult) {
DispatchProfile newDispatchProfile = new DispatchProfile(eventUid, event.getInlongGroupId(),
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
new file mode 100644
index 0000000..0a1e80c
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.commons.protocol.EventUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarClusterProducer
+ */
+public class PulsarClusterProducer implements LifecycleAware {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
+
+ public static final String KEY_SERVICE_URL = "serviceUrl";
+ public static final String KEY_AUTHENTICATION = "authentication";
+
+ public static final String KEY_ENABLEBATCHING = "enableBatching";
+ public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
+ public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
+ public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
+ public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
+ public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
+ public static final String KEY_SENDTIMEOUT = "sendTimeout";
+ public static final String KEY_COMPRESSIONTYPE = "compressionType";
+ public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
+ public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+ + "BatchingPartitionSwitchFrequency";
+
+ public static final String KEY_IOTHREADS = "ioThreads";
+ public static final String KEY_MEMORYLIMIT = "memoryLimit";
+ public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
+
+ private final String workerName;
+ private final CacheClusterConfig config;
+ private final PulsarZoneSinkContext sinkContext;
+ private final Context context;
+ private final String cacheClusterName;
+ private LifecycleState state;
+
+ /**
+ * pulsar client
+ */
+ private PulsarClient client;
+ private ProducerBuilder<byte[]> baseBuilder;
+
+ private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ *
+ * @param workerName
+ * @param config
+ * @param context
+ */
+ public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
+ this.workerName = workerName;
+ this.config = config;
+ this.sinkContext = context;
+ this.context = context.getProducerContext();
+ this.state = LifecycleState.IDLE;
+ this.cacheClusterName = config.getClusterName();
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ this.state = LifecycleState.START;
+ // create pulsar client
+ try {
+ String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+ String authentication = config.getParams().get(KEY_AUTHENTICATION);
+ this.client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .authentication(AuthenticationFactory.token(authentication))
+ .ioThreads(context.getInteger(KEY_IOTHREADS, 1))
+ .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
+ .connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
+ .build();
+ this.baseBuilder = client.newProducer();
+// Map<String, Object> builderConf = new HashMap<>();
+// builderConf.putAll(context.getParameters());
+ this.baseBuilder
+ .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
+ .maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
+ .maxPendingMessagesAcrossPartitions(
+ context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
+ .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
+ .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
+ TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
+ this.baseBuilder
+ .accessMode(ProducerAccessMode.Shared)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+ .roundRobinRouterBatchingPartitionSwitchFrequency(
+ context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
+ .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
+ .compressionType(this.getPulsarCompressionType());
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * getPulsarCompressionType
+ *
+ * @return CompressionType
+ */
+ private CompressionType getPulsarCompressionType() {
+ String type = this.context.getString(KEY_COMPRESSIONTYPE);
+ switch (type) {
+ case "LZ4" :
+ return CompressionType.LZ4;
+ case "NONE" :
+ return CompressionType.NONE;
+ case "ZLIB" :
+ return CompressionType.ZLIB;
+ case "ZSTD" :
+ return CompressionType.ZSTD;
+ case "SNAPPY" :
+ return CompressionType.SNAPPY;
+ default :
+ return CompressionType.NONE;
+ }
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ this.state = LifecycleState.STOP;
+ //
+ for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (PulsarClientException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ try {
+ this.client.close();
+ } catch (PulsarClientException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * getLifecycleState
+ *
+ * @return
+ */
+ @Override
+ public LifecycleState getLifecycleState() {
+ return state;
+ }
+
+ /**
+ * send
+ *
+ * @param event
+ */
+ public boolean send(DispatchProfile event) {
+ try {
+ // topic
+ String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+ if (topic == null) {
+ sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+ return false;
+ }
+ // get producer
+ Producer<byte[]> producer = this.producerMap.get(topic);
+ if (producer == null) {
+ try {
+ LOG.info("try to new a object for topic " + topic);
+ SecureRandom secureRandom = new SecureRandom(
+ (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis())
+ .getBytes());
+ String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-"
+ + secureRandom.nextLong();
+ producer = baseBuilder.clone().topic(topic)
+ .producerName(producerName)
+ .create();
+ LOG.info("create new producer success:{}", producer.getProducerName());
+ Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+ if (oldProducer != null) {
+ producer.close();
+ LOG.info("close producer success:{}", producer.getProducerName());
+ producer = oldProducer;
+ }
+ } catch (Throwable ex) {
+ LOG.error("create new producer failed", ex);
+ }
+ }
+ // create producer failed
+ if (producer == null) {
+ sinkContext.getDispatchQueue().offer(event);
+ sinkContext.addSendResultMetric(event, topic, false, 0);
+ return false;
+ }
+ // headers
+ Map<String, String> headers = this.encodeCacheMessageHeaders(event);
+ // compress
+ byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
+ // sendAsync
+ long sendTime = System.currentTimeMillis();
+ CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
+ .value(bodyBytes).sendAsync();
+ // callback
+ future.whenCompleteAsync((msgId, ex) -> {
+ if (ex != null) {
+ LOG.error("Send fail:{}", ex.getMessage());
+ LOG.error(ex.getMessage(), ex);
+ sinkContext.getDispatchQueue().offer(event);
+ sinkContext.addSendResultMetric(event, topic, false, sendTime);
+ } else {
+ sinkContext.addSendResultMetric(event, topic, true, sendTime);
+ }
+ });
+ return true;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ sinkContext.getDispatchQueue().offer(event);
+ sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+ return false;
+ }
+ }
+
+ /**
+ * encodeCacheMessageHeaders
+ *
+ * @param event
+ * @return Map
+ */
+ public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
+ Map<String, String> headers = new HashMap<>();
+ // version int32 protocol version, the value is 1
+ headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
+ // inlongGroupId string inlongGroupId
+ headers.put(EventConstants.INLONG_GROUP_ID, event.getInlongGroupId());
+ // inlongStreamId string inlongStreamId
+ headers.put(EventConstants.INLONG_STREAM_ID, event.getInlongStreamId());
+ // proxyName string proxy node id, IP or conainer name
+ headers.put(EventConstants.HEADER_KEY_PROXY_NAME, sinkContext.getNodeId());
+ // packTime int64 pack time, milliseconds
+ headers.put(EventConstants.HEADER_KEY_PACK_TIME, String.valueOf(System.currentTimeMillis()));
+ // msgCount int32 message count
+ headers.put(EventConstants.HEADER_KEY_MSG_COUNT, String.valueOf(event.getEvents().size()));
+ // srcLength int32 total length of raw messages body
+ headers.put(EventConstants.HEADER_KEY_SRC_LENGTH, String.valueOf(event.getSize()));
+ // compressType int
+ // compress type of body data
+ // INLONG_NO_COMPRESS = 0,
+ // INLONG_GZ = 1,
+ // INLONG_SNAPPY = 2
+ headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
+ String.valueOf(sinkContext.getCompressType().getNumber()));
+ // messageKey string partition hash key, optional
+ return headers;
+ }
+
+ /**
+ * get cacheClusterName
+ *
+ * @return the cacheClusterName
+ */
+ public String getCacheClusterName() {
+ return cacheClusterName;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java
new file mode 100644
index 0000000..b000a45
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneProducer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PulsarZoneProducer
+ */
+public class PulsarZoneProducer {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneProducer.class);
+ public static final int MAX_INDEX = Integer.MAX_VALUE / 2;
+
+ private final String workerName;
+ private final PulsarZoneSinkContext context;
+ private Timer reloadTimer;
+
+ private List<PulsarClusterProducer> clusterList = new ArrayList<>();
+ private List<PulsarClusterProducer> deletingClusterList = new ArrayList<>();
+
+ private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param workerName
+ * @param context
+ */
+ public PulsarZoneProducer(String workerName, PulsarZoneSinkContext context) {
+ this.workerName = workerName;
+ this.context = context;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ for (PulsarClusterProducer cluster : this.clusterList) {
+ cluster.stop();
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
+ context.getReloadInterval());
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ // stop deleted cluster
+ deletingClusterList.forEach(item -> {
+ item.stop();
+ });
+ deletingClusterList.clear();
+ // update cluster list
+ List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
+ List<PulsarClusterProducer> newClusterList = new ArrayList<>(configList.size());
+ // prepare
+ Set<String> newClusterNames = new HashSet<>();
+ configList.forEach(item -> {
+ newClusterNames.add(item.getClusterName());
+ });
+ Set<String> oldClusterNames = new HashSet<>();
+ clusterList.forEach(item -> {
+ oldClusterNames.add(item.getCacheClusterName());
+ });
+ // add
+ for (CacheClusterConfig config : configList) {
+ if (!oldClusterNames.contains(config.getClusterName())) {
+ PulsarClusterProducer cluster = new PulsarClusterProducer(workerName, config, context);
+ cluster.start();
+ newClusterList.add(cluster);
+ }
+ }
+ // remove
+ for (PulsarClusterProducer cluster : this.clusterList) {
+ if (newClusterNames.contains(cluster.getCacheClusterName())) {
+ newClusterList.add(cluster);
+ } else {
+ deletingClusterList.add(cluster);
+ }
+ }
+ this.clusterList = newClusterList;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * send
+ *
+ * @param event
+ */
+ public boolean send(DispatchProfile event) {
+ int currentIndex = clusterIndex.getAndIncrement();
+ if (currentIndex > MAX_INDEX) {
+ clusterIndex.set(0);
+ }
+ List<PulsarClusterProducer> currentClusterList = this.clusterList;
+ int currentSize = currentClusterList.size();
+ int realIndex = currentIndex % currentSize;
+ PulsarClusterProducer clusterProducer = currentClusterList.get(realIndex);
+ return clusterProducer.send(event);
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
new file mode 100644
index 0000000..61aee51
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.dispatch.DispatchManager;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarZoneSink
+ */
+public class PulsarZoneSink extends AbstractSink implements Configurable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneSink.class);
+
+ private Context parentContext;
+ private PulsarZoneSinkContext context;
+ private List<PulsarZoneWorker> workers = new ArrayList<>();
+ // message group
+ private DispatchManager dispatchManager;
+ private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+ // scheduled thread pool
+ // reload
+ // dispatch
+ private ScheduledExecutorService scheduledPool;
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+ this.parentContext = context;
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ try {
+ this.context = new PulsarZoneSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+ if (getChannel() == null) {
+ LOG.error("channel is null");
+ }
+ this.context.start();
+ this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+ this.scheduledPool = Executors.newScheduledThreadPool(2);
+ // dispatch
+ this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+ public void run() {
+ dispatchManager.setNeedOutputOvertimeData();
+ }
+ }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+ TimeUnit.MILLISECONDS);
+ // create worker
+ for (int i = 0; i < context.getMaxThreads(); i++) {
+ PulsarZoneWorker worker = new PulsarZoneWorker(this.getName(), i, context);
+ worker.start();
+ this.workers.add(worker);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ super.start();
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ for (PulsarZoneWorker worker : workers) {
+ try {
+ worker.close();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ this.context.close();
+ super.stop();
+ }
+
+ /**
+ * process
+ *
+ * @return Status
+ * @throws EventDeliveryException
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event == null) {
+ tx.commit();
+ return Status.BACKOFF;
+ }
+ if (!(event instanceof ProxyEvent)) {
+ tx.commit();
+ this.context.addSendFailMetric();
+ return Status.READY;
+ }
+ //
+ ProxyEvent proxyEvent = (ProxyEvent) event;
+ this.dispatchManager.addEvent(proxyEvent);
+ tx.commit();
+ return Status.READY;
+ } catch (Throwable t) {
+ LOG.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ } catch (Throwable e) {
+ LOG.error("Channel take transaction rollback exception:" + getName(), e);
+ }
+ return Status.BACKOFF;
+ } finally {
+ tx.close();
+ }
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
new file mode 100644
index 0000000..f867f1d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.sink.SinkContext;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+
+/**
+ *
+ * PulsarZoneSinkContext
+ */
+public class PulsarZoneSinkContext extends SinkContext {
+
+ public static final String KEY_NODE_ID = "nodeId";
+ public static final String PREFIX_PRODUCER = "producer.";
+ public static final String KEY_COMPRESS_TYPE = "compressType";
+
+ private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+
+ private final String proxyClusterId;
+ private final String nodeId;
+ private final Context producerContext;
+ //
+ private final IdTopicConfigHolder idTopicHolder;
+ private final CacheClusterConfigHolder cacheHolder;
+ private final INLONG_COMPRESSED_TYPE compressType;
+
+ /**
+ * Constructor
+ *
+ * @param context
+ */
+ public PulsarZoneSinkContext(String sinkName, Context context, Channel channel,
+ LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+ super(sinkName, context, channel);
+ this.dispatchQueue = dispatchQueue;
+ // proxyClusterId
+ this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ // nodeId
+ this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
+ // compressionType
+ String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
+ INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
+ this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
+ // producerContext
+ Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
+ this.producerContext = new Context(producerParams);
+ // idTopicHolder
+ this.idTopicHolder = new IdTopicConfigHolder();
+ this.idTopicHolder.configure(context);
+ // cacheHolder
+ this.cacheHolder = new CacheClusterConfigHolder();
+ this.cacheHolder.configure(context);
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ super.start();
+ this.idTopicHolder.start();
+ this.cacheHolder.start();
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ super.close();
+ this.idTopicHolder.close();
+ this.cacheHolder.close();
+ }
+
+ /**
+ * get proxyClusterId
+ *
+ * @return the proxyClusterId
+ */
+ public String getProxyClusterId() {
+ return proxyClusterId;
+ }
+
+ /**
+ * get dispatchQueue
+ *
+ * @return the dispatchQueue
+ */
+ public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
+ return dispatchQueue;
+ }
+
+ /**
+ * get producerContext
+ *
+ * @return the producerContext
+ */
+ public Context getProducerContext() {
+ return producerContext;
+ }
+
+ /**
+ * get idTopicHolder
+ *
+ * @return the idTopicHolder
+ */
+ public IdTopicConfigHolder getIdTopicHolder() {
+ return idTopicHolder;
+ }
+
+ /**
+ * get cacheHolder
+ *
+ * @return the cacheHolder
+ */
+ public CacheClusterConfigHolder getCacheHolder() {
+ return cacheHolder;
+ }
+
+ /**
+ * get compressType
+ *
+ * @return the compressType
+ */
+ public INLONG_COMPRESSED_TYPE getCompressType() {
+ return compressType;
+ }
+
+ /**
+ * get nodeId
+ *
+ * @return the nodeId
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * addSendMetric
+ *
+ * @param currentRecord
+ * @param bid
+ */
+ public void addSendMetric(DispatchProfile currentRecord, String bid) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ // metric
+ fillInlongId(currentRecord, dimensions);
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
+ long msgTime = currentRecord.getDispatchTime();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ long count = currentRecord.getCount();
+ long size = currentRecord.getSize();
+ metricItem.sendCount.addAndGet(count);
+ metricItem.sendSize.addAndGet(size);
+ }
+
+ /**
+ * addReadFailMetric
+ */
+ public void addSendFailMetric() {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ }
+
+ /**
+ * fillInlongId
+ *
+ * @param currentRecord
+ * @param dimensions
+ */
+ public static void fillInlongId(DispatchProfile currentRecord, Map<String, String> dimensions) {
+ String inlongGroupId = currentRecord.getInlongGroupId();
+ inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId;
+ String inlongStreamId = currentRecord.getInlongStreamId();
+ inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : inlongStreamId;
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
+ }
+
+ /**
+ * addSendResultMetric
+ *
+ * @param currentRecord
+ * @param bid
+ * @param result
+ * @param sendTime
+ */
+ public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ // metric
+ fillInlongId(currentRecord, dimensions);
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
+ long msgTime = currentRecord.getDispatchTime();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ long count = currentRecord.getCount();
+ long size = currentRecord.getSize();
+ if (result) {
+ metricItem.sendSuccessCount.addAndGet(count);
+ metricItem.sendSuccessSize.addAndGet(size);
+ currentRecord.getEvents().forEach((event) -> {
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
+ });
+ if (sendTime > 0) {
+ long currentTime = System.currentTimeMillis();
+ currentRecord.getEvents().forEach((event) -> {
+ long sinkDuration = currentTime - sendTime;
+ long nodeDuration = currentTime - event.getMsgTime();
+ long wholeDuration = currentTime - msgTime;
+ metricItem.sinkDuration.addAndGet(sinkDuration * count);
+ metricItem.nodeDuration.addAndGet(nodeDuration * count);
+ metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ });
+ }
+ } else {
+ metricItem.sendFailCount.addAndGet(count);
+ metricItem.sendFailSize.addAndGet(size);
+ }
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java
new file mode 100644
index 0000000..35e54fb
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneWorker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsarzone;
+
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PulsarZoneWorker
+ */
+public class PulsarZoneWorker extends Thread {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneWorker.class);
+
+ private final String workerName;
+ private final PulsarZoneSinkContext context;
+
+ private PulsarZoneProducer zoneProducer;
+ private LifecycleState status;
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param workerIndex
+ * @param context
+ */
+ public PulsarZoneWorker(String sinkName, int workerIndex, PulsarZoneSinkContext context) {
+ super();
+ this.workerName = sinkName + "-worker-" + workerIndex;
+ this.context = context;
+ this.zoneProducer = new PulsarZoneProducer(workerName, this.context);
+ this.status = LifecycleState.IDLE;
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ this.zoneProducer.start();
+ this.status = LifecycleState.START;
+ super.start();
+ }
+
+ /**
+ *
+ * close
+ */
+ public void close() {
+ // close all producers
+ this.zoneProducer.close();
+ this.status = LifecycleState.STOP;
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ LOG.info(String.format("start PulsarZoneWorker:%s", this.workerName));
+ while (status != LifecycleState.STOP) {
+ try {
+ DispatchProfile event = context.getDispatchQueue().poll();
+ if (event == null) {
+ this.sleepOneInterval();
+ continue;
+ }
+ // metric
+ context.addSendMetric(event, workerName);
+ // send
+ this.zoneProducer.send(event);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ this.sleepOneInterval();
+ }
+ }
+ }
+
+ /**
+ * sleepOneInterval
+ */
+ private void sleepOneInterval() {
+ try {
+ Thread.sleep(context.getProcessInterval());
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+}