You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/10 11:22:12 UTC
[inlong] 02/03: [INLONG-7191][DataProxy] Remove unused code (#7192)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 9c9243d284437b4526c603194c54bfe9292edd51
Author: woofyzhao <zh...@gmail.com>
AuthorDate: Tue Jan 10 15:58:32 2023 +0800
[INLONG-7191][DataProxy] Remove unused code (#7192)
---
.../apache/inlong/dataproxy/sink/EventStat.java | 81 ---
.../apache/inlong/dataproxy/sink/PulsarSink.java | 725 ---------------------
.../dataproxy/sink/SimpleMessageTubeSink.java | 714 --------------------
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 707 --------------------
.../dataproxy/sink/mqzone/AbstactZoneWorker.java | 108 ---
.../sink/mqzone/AbstractZoneClusterProducer.java | 115 ----
.../sink/mqzone/AbstractZoneProducer.java | 159 -----
.../dataproxy/sink/mqzone/AbstractZoneSink.java | 179 -----
.../sink/mqzone/AbstractZoneSinkContext.java | 405 ------------
.../sink/mqzone/ZoneClusterProducerCalculator.java | 27 -
.../sink/mqzone/ZoneWorkerCalculator.java | 23 -
.../impl/kafkazone/KafkaClusterProducer.java | 148 -----
.../mqzone/impl/kafkazone/KafkaZoneProducer.java | 50 --
.../sink/mqzone/impl/kafkazone/KafkaZoneSink.java | 45 --
.../impl/kafkazone/KafkaZoneSinkContext.java | 44 --
.../mqzone/impl/kafkazone/KafkaZoneWorker.java | 52 --
.../impl/pulsarzone/PulsarClusterProducer.java | 278 --------
.../mqzone/impl/pulsarzone/PulsarZoneProducer.java | 45 --
.../mqzone/impl/pulsarzone/PulsarZoneSink.java | 45 --
.../impl/pulsarzone/PulsarZoneSinkContext.java | 44 --
.../mqzone/impl/pulsarzone/PulsarZoneWorker.java | 52 --
.../mqzone/impl/tubezone/TubeClusterProducer.java | 205 ------
.../mqzone/impl/tubezone/TubeZoneProducer.java | 51 --
.../sink/mqzone/impl/tubezone/TubeZoneSink.java | 46 --
.../mqzone/impl/tubezone/TubeZoneSinkContext.java | 44 --
.../sink/mqzone/impl/tubezone/TubeZoneWorker.java | 51 --
.../sink/pulsar/CreatePulsarClientCallBack.java | 25 -
.../dataproxy/sink/pulsar/PulsarClientService.java | 563 ----------------
.../dataproxy/sink/pulsar/SendMessageCallBack.java | 33 -
.../inlong/dataproxy/sink/pulsar/SinkTask.java | 230 -------
.../pulsar/federation/PulsarFederationSink.java | 140 ----
.../federation/PulsarFederationSinkContext.java | 198 ------
.../pulsar/federation/PulsarFederationWorker.java | 148 -----
.../pulsar/federation/PulsarProducerCluster.java | 319 ---------
.../federation/PulsarProducerFederation.java | 163 -----
.../inlong/dataproxy/sink/TestPulsarSink.java | 56 --
.../apache/inlong/dataproxy/sink/TestTubeSink.java | 57 --
.../federation/TestPulsarFederationSink.java | 87 ---
.../federation/TestPulsarProducerFederation.java | 95 ---
39 files changed, 6557 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java
deleted file mode 100644
index 3bf635e15..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-
-public class EventStat {
-
- private static long RETRY_INTERVAL_MS = 1 * 1000L;
- private Event event;
- private AtomicInteger myRetryCnt = new AtomicInteger(0);
- private boolean isOrderMessage = false;
- private boolean isSinkRspType = false;
-
- public EventStat(Event event) {
- this.event = event;
- this.isSinkRspType = MessageUtils.isSinkRspType(event);
- this.isOrderMessage = MessageUtils.isSyncSendForOrder(event);
- }
-
- public EventStat(Event event, int retryCnt) {
- this.event = event;
- this.myRetryCnt.set(retryCnt);
- this.isSinkRspType = MessageUtils.isSinkRspType(event);
- this.isOrderMessage = MessageUtils.isSyncSendForOrder(event);
- }
-
- public Event getEvent() {
- return event;
- }
-
- public void setEvent(Event event) {
- this.event = event;
- }
-
- public int getRetryCnt() {
- return myRetryCnt.get();
- }
-
- public void setRetryCnt(int retryCnt) {
- this.myRetryCnt.set(retryCnt);
- }
-
- public void incRetryCnt() {
- this.myRetryCnt.incrementAndGet();
- }
-
- public boolean isOrderMessage() {
- return isOrderMessage;
- }
-
- public boolean isSinkRspType() {
- return isSinkRspType;
- }
-
- public boolean shouldDrop() {
- return false;
- }
-
- public void reset() {
- this.event = null;
- this.myRetryCnt.set(0);
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
deleted file mode 100644
index 25cf65417..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.RateLimiter;
-import javax.annotation.Nonnull;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import io.netty.handler.codec.TooLongFrameException;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
-import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
-import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
-import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
-import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
-import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
-import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Use pulsarSink need adding such config, if these ara not config in dataproxy-pulsar.conf,
- * PulsarSink will use default value.
- * <p/>
- *
- * Prefix of Pulsar sink config in flume.conf like this XXX.sinks.XXX.property and properties are may these
- * configurations:
- * <p/>
- * <code>type</code> (*): value must be 'org.apache.inlong.dataproxy.sink.PulsarSink'
- * <p/>
- * <code>pulsar_server_url_list</code> (*): value is pulsar broker url, like 'pulsar://127.0.0.1:6650'
- * <p/>
- * <code>send_timeout_MILL</code>: send message timeout, unit is millisecond, default is 30000 (mean 30s)
- * <p/>
- * <code>stat_interval_sec</code>: stat info will be made period time, unit is second, default is 60s
- * <p/>
- * <code>thread-num</code>: sink thread num, default is 8
- * <p/>
- * <code>client-id-cache</code>: whether the client uses cache, default is true
- * <p/>
- * <code>max_pending_messages</code>: default is 10000
- * <p/>
- * <code>max_batching_messages</code>: default is 1000
- * <p/>
- * <code>enable_batch</code>: default is true
- * <p/>
- * <code>block_if_queue_full</code>: default is true
- */
-public class PulsarSink extends AbstractSink implements Configurable, SendMessageCallBack, CreatePulsarClientCallBack {
-
- private static final Logger logger = LoggerFactory.getLogger(PulsarSink.class);
- /**
- * log tools
- */
- private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
- private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
- private static final String SEPARATOR = "#";
- private static final Long PRINT_INTERVAL = 30L;
-
- private static final PulsarPerformanceTask PULSAR_PERFORMANCE_TASK = new PulsarPerformanceTask();
- private static final LoadingCache<String, Long> AGENT_ID_CACHE = CacheBuilder.newBuilder()
- .concurrencyLevel(4 * 8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
- .build(new CacheLoader<String, Long>() {
-
- @Nonnull
- @Override
- public Long load(@Nonnull String key) {
- return System.currentTimeMillis();
- }
- });
- /*
- * for stat
- */
- private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_CNT = new AtomicLong(0);
- private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_SIZE = new AtomicLong(0);
- private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
- .newScheduledThreadPool(1, new HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
-
- static {
- SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(PULSAR_PERFORMANCE_TASK, 0L,
- PRINT_INTERVAL, TimeUnit.SECONDS);
- logger.info("success to start pulsar performance task");
- }
-
- private final AtomicLong currentInFlightCount = new AtomicLong(0);
- private final AtomicLong currentSuccessSendCnt = new AtomicLong(0);
- private final AtomicLong lastSuccessSendCnt = new AtomicLong(0);
- private final AtomicInteger processIndex = new AtomicInteger(0);
-
- private RateLimiter diskRateLimiter;
- private long t1 = System.currentTimeMillis();
- private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
- /*
- * Control whether the SinkRunner thread can read data from the Channel
- */
- private volatile boolean canTake = false;
- private SinkCounter sinkCounter;
- /*
- * message queue and retry
- */
- private int eventQueueSize = 10000;
- private int badEventQueueSize = 10000;
- private int maxRetrySendCnt = 16;
- /*
- * send thread pool
- */
- private SinkTask[] sinkThreadPool;
- private int sinkThreadPoolSize;
- private PulsarClientService pulsarClientService;
- /*
- * statistic info log
- */
- private MonitorIndex monitorIndex;
- private MonitorIndexExt monitorIndexExt;
-
- /*
- * metric
- */
- private DataProxyMetricItemSet metricItemSet;
- private ConfigManager configManager;
- private Map<String, String> topicProperties;
- private Map<String, String> pulsarCluster;
- private MQClusterConfig pulsarConfig;
-
- public PulsarSink() {
- super();
- logger.debug("new instance of PulsarSink!");
- }
-
- /**
- * configure
- */
- @Override
- public void configure(Context context) {
- logger.info("PulsarSink started and context = {}", context.toString());
- // get maxMonitorCnt's configure value
- try {
- maxMonitorCnt = context.getInteger(
- ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT);
- } catch (NumberFormatException e) {
- logger.warn("Property {} must specify an integer value: {}",
- ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT));
- }
- Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0");
- configManager = ConfigManager.getInstance();
- topicProperties = configManager.getTopicProperties();
- pulsarCluster = configManager.getMqClusterUrl2Token();
- pulsarConfig = configManager.getMqClusterConfig(); // pulsar common config
- sinkThreadPoolSize = pulsarConfig.getThreadNum();
- if (sinkThreadPoolSize <= 0) {
- sinkThreadPoolSize = 1;
- }
- pulsarClientService = new PulsarClientService(pulsarConfig, sinkThreadPoolSize);
-
- configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
- @Override
- public void update() {
- if (pulsarClientService != null) {
- diffSetPublish(pulsarClientService, new HashSet<>(topicProperties.values()),
- new HashSet<>(configManager.getTopicProperties().values()));
- }
- }
- });
- configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
-
- @Override
- public void update() {
- if (pulsarClientService != null) {
- diffUpdatePulsarClient(pulsarClientService, pulsarCluster, configManager.getMqClusterUrl2Token());
- }
- }
- });
- maxRetrySendCnt = pulsarConfig.getMaxRetryCnt();
- badEventQueueSize = pulsarConfig.getBadEventQueueSize();
- Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
- sinkThreadPool = new SinkTask[sinkThreadPoolSize];
- eventQueueSize = pulsarConfig.getEventQueueSize();
- if (pulsarConfig.getDiskIoRatePerSec() != 0) {
- diskRateLimiter = RateLimiter.create(pulsarConfig.getDiskIoRatePerSec());
- }
-
- if (sinkCounter == null) {
- sinkCounter = new SinkCounter(getName());
- }
- }
-
- private void initTopicSet(PulsarClientService pulsarClientService, Set<String> topicSet) {
- long startTime = System.currentTimeMillis();
- if (topicSet != null) {
- for (String topic : topicSet) {
- pulsarClientService.initTopicProducer(topic);
- }
- }
- logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
- logger.info(getName() + " producer is ready for topics: " + pulsarClientService.getProducerInfoMap().keySet());
- }
-
- /**
- * When topic.properties is re-enabled, the producer update is triggered
- */
- public void diffSetPublish(PulsarClientService pulsarClientService,
- Set<String> curTopicSet, Set<String> newTopicSet) {
- boolean changed = false;
- // create producers for new topics
- for (String newTopic : newTopicSet) {
- if (!curTopicSet.contains(newTopic)) {
- changed = true;
- try {
- pulsarClientService.initTopicProducer(newTopic);
- } catch (Exception e) {
- logger.error("get producer failed: ", e);
- }
- }
- }
- // remove producers for deleted topics
- for (String oldTopic : curTopicSet) {
- if (!newTopicSet.contains(oldTopic)) {
- changed = true;
- try {
- pulsarClientService.destroyProducerByTopic(oldTopic);
- } catch (Exception e) {
- logger.error("remove producer failed: ", e);
- }
- }
- }
- if (changed) {
- topicProperties = configManager.getTopicProperties();
- logger.info("topics.properties has changed, trigger diff publish for {},"
- + " old topic set = {}, new topic set = {}, current topicProperties = {}",
- getName(), curTopicSet, newTopicSet, topicProperties);
- }
- }
-
- /**
- * When pulsarURLList change, close and restart
- */
- public void diffUpdatePulsarClient(PulsarClientService pulsarClientService, Map<String, String> originalCluster,
- Map<String, String> endCluster) {
- MapDifference<String, String> mapDifference = Maps.difference(originalCluster, endCluster);
- if (mapDifference.areEqual()) {
- return;
- }
-
- logger.info("pulsarConfig has changed, close unused url clients and start new url clients");
- Map<String, String> needToClose = new HashMap<>(mapDifference.entriesOnlyOnLeft());
- Map<String, String> needToStart = new HashMap<>(mapDifference.entriesOnlyOnRight());
- Map<String, MapDifference.ValueDifference<String>> differentToken = mapDifference.entriesDiffering();
- for (String url : differentToken.keySet()) {
- needToClose.put(url, originalCluster.get(url));
- needToStart.put(url, endCluster.get(url));// token changed
- }
-
- pulsarClientService.updatePulsarClients(this, needToClose, needToStart,
- new HashSet<>(topicProperties.values()));
-
- pulsarCluster = configManager.getMqClusterUrl2Token();
- if (!ConfigManager.getInstance().isMqClusterReady()) {
- ConfigManager.getInstance().updMqClusterStatus(true);
- logger.info("[{}] MQ Cluster service status ready!", getName());
- }
- }
-
- @Override
- public void start() {
- logger.info("[{}] pulsar sink starting...", getName());
- sinkCounter.start();
- pulsarClientService.initCreateConnection(this, getName());
-
- int statIntervalSec = pulsarConfig.getStatIntervalSec();
- Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
- if (statIntervalSec > 0) {
- // switch for lots of metrics
- monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
- monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + this.getName(),
- statIntervalSec, maxMonitorCnt);
- }
-
- super.start();
-
- try {
- initTopicSet(pulsarClientService, new HashSet<String>(topicProperties.values()));
- } catch (Exception e) {
- logger.info("pulsar sink start publish topic fail.", e);
- }
-
- for (int i = 0; i < sinkThreadPoolSize; i++) {
- sinkThreadPool[i] = new SinkTask(pulsarClientService, this,
- eventQueueSize / sinkThreadPoolSize,
- badEventQueueSize / sinkThreadPoolSize, i, true);
- sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
- sinkThreadPool[i].start();
- }
- // register metricItemSet
- ConfigManager configManager = ConfigManager.getInstance();
- String clusterId =
- configManager.getCommonProperties().getOrDefault(
- ConfigConstants.PROXY_CLUSTER_NAME,
- ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
- this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
- MetricRegister.register(metricItemSet);
- this.canTake = true;
- logger.info("[{}] Pulsar sink started", getName());
- }
-
- @Override
- public void stop() {
- logger.info("pulsar sink stopping");
- this.canTake = false;
- int waitCount = 0;
- while (isAllSendFinished() && waitCount++ < 10) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.info("Stop thread has been interrupt!");
- break;
- }
- }
- if (pulsarConfig.getStatIntervalSec() > 0) {
- try {
- monitorIndex.shutDown();
- } catch (Exception e) {
- logger.warn("stat runner interrupted");
- }
- }
- if (pulsarClientService != null) {
- pulsarClientService.close();
- }
- if (sinkThreadPool != null) {
- for (SinkTask thread : sinkThreadPool) {
- if (thread != null) {
- thread.close();
- thread.interrupt();
- }
- }
- sinkThreadPool = null;
- }
-
- super.stop();
- if (!SCHEDULED_EXECUTOR_SERVICE.isShutdown()) {
- SCHEDULED_EXECUTOR_SERVICE.shutdown();
- }
- sinkCounter.stop();
- logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
- }
-
- private boolean isAllSendFinished() {
- for (int i = 0; i < sinkThreadPoolSize; i++) {
- if (!sinkThreadPool[i].isAllSendFinished()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- if (!this.canTake) {
- return Status.BACKOFF;
- }
-
- Status status = Status.READY;
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event != null) {
- if (diskRateLimiter != null) {
- diskRateLimiter.acquire(event.getBody().length);
- }
- if (!processEvent(new EventStat(event))) {
- logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
- + "--> pulsar,Check if pulsar server or network is ok.(if this situation "
- + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
- tx.rollback();
- } else {
- tx.commit();
- }
- } else {
- status = Status.BACKOFF;
- tx.commit();
- }
- } catch (Throwable t) {
- logger.error("Process event failed!" + this.getName(), t);
- try {
- tx.rollback();
- } catch (Throwable e) {
- logger.error("pulsar sink transaction rollback exception", e);
-
- }
- } finally {
- tx.close();
- }
- return status;
- }
-
- @Override
- public void handleCreateClientSuccess(String url) {
- logger.info("createConnection success for url = {}", url);
- sinkCounter.incrementConnectionCreatedCount();
- }
-
- @Override
- public void handleCreateClientException(String url) {
- logger.info("createConnection has exception for url = {}", url);
- sinkCounter.incrementConnectionFailedCount();
- }
-
- @Override
- public void handleMessageSendSuccess(String topic, Object result,
- EventStat eventStat, long startTime) {
- /*
- * Statistics pulsar performance
- */
- TOTAL_PULSAR_SUCC_SEND_CNT.incrementAndGet();
- TOTAL_PULSAR_SUCC_SEND_SIZE.addAndGet(eventStat.getEvent().getBody().length);
- /*
- * add to sinkCounter
- */
- sinkCounter.incrementEventDrainSuccessCount();
- currentInFlightCount.decrementAndGet();
- currentSuccessSendCnt.incrementAndGet();
- long nowCnt = currentSuccessSendCnt.get();
- long oldCnt = lastSuccessSendCnt.get();
- long logEveryNEvents = pulsarConfig.getLogEveryNEvents();
- Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
-
- if (nowCnt % logEveryNEvents == 0 && nowCnt != lastSuccessSendCnt.get()) {
- lastSuccessSendCnt.set(nowCnt);
- long t2 = System.currentTimeMillis();
- logger.info("Pulsar sink {}, succ put {} events to pulsar in the past {} millsec",
- getName(), (nowCnt - oldCnt), (t2 - t1));
- t1 = t2;
- }
- addStatistics(eventStat, true, startTime);
- if (eventStat.isSinkRspType()) {
- MessageUtils.sinkReturnRspPackage(
- (SinkRspEvent) eventStat.getEvent(), DataProxyErrCode.SUCCESS, "");
- }
- }
-
- @Override
- public void handleMessageSendException(String topic, EventStat eventStat,
- Object e, DataProxyErrCode errCode, String errMsg) {
- // decrease inflight count
- currentInFlightCount.decrementAndGet();
- // check whether retry send message
- boolean needRetry = true;
- if (e instanceof NotFoundException) {
- logger.error("NotFoundException for topic " + topic + ", message will be discard!", e);
- needRetry = false;
- } else if (e instanceof TooLongFrameException) {
- logger.error("TooLongFrameException, send failed for " + getName(), e);
- } else if (e instanceof ProducerQueueIsFullError) {
- logger.error("ProducerQueueIsFullError, send failed for " + getName(), e);
- } else if (!(e instanceof AlreadyClosedException
- || e instanceof PulsarClientException.NotConnectedException
- || e instanceof TopicTerminatedException)) {
- if (logPrinterB.shouldPrint()) {
- logger.error("send failed for " + getName(), e);
- }
- }
- addStatistics(eventStat, false, 0);
- if (eventStat.isSinkRspType()) {
- MessageUtils.sinkReturnRspPackage(
- (SinkRspEvent) eventStat.getEvent(), errCode, errMsg);
- } else {
- eventStat.incRetryCnt();
- if (needRetry) {
- processResendEvent(eventStat);
- }
- }
- }
-
- @Override
- public void handleRequestProcError(String topic, EventStat eventStat, boolean needRetry,
- DataProxyErrCode errCode, String errMsg) {
- if (logPrinterB.shouldPrint()) {
- logger.error(errMsg);
- }
- addStatistics(eventStat, false, 0);
- if (MessageUtils.isSinkRspType(eventStat.getEvent())) {
- MessageUtils.sinkReturnRspPackage(
- (SinkRspEvent) eventStat.getEvent(), errCode, errMsg);
- } else {
- eventStat.incRetryCnt();
- if (needRetry) {
- processResendEvent(eventStat);
- }
- }
- }
-
- /**
- * Add statistics information
- *
- * @param eventStat the statistic event
- * @param isSuccess is processed successfully
- * @param sendTime the send time when success processed
- */
- private void addStatistics(EventStat eventStat, boolean isSuccess, long sendTime) {
- if (eventStat == null || eventStat.getEvent() == null) {
- return;
- }
- Event event = eventStat.getEvent();
- // add jmx metric items;
- PulsarSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
- event, sendTime, isSuccess, event.getBody().length);
- if (isSuccess) {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- }
- if (pulsarConfig.getStatIntervalSec() <= 0) {
- return;
- }
- // add monitor items base file storage
- String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
- String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
- int intMsgCnt = Integer.parseInt(
- event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- long dataTimeL = Long.parseLong(
- event.getHeaders().get(AttributeConstants.DATA_TIME));
- Pair<Boolean, String> evenProcType =
- MessageUtils.getEventProcType(event);
- // build statistic key
- StringBuilder newBase = new StringBuilder(512)
- .append(getName()).append(SEP_HASHTAG).append(topic)
- .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
- .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
- .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
- .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
- // count data
- if (isSuccess) {
- monitorIndex.addAndGet(newBase.toString(),
- intMsgCnt, 1, event.getBody().length, 0);
- monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
- } else {
- monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
- monitorIndex.addAndGet(newBase.toString(),
- 0, 0, 0, intMsgCnt);
- }
- }
-
- private boolean processEvent(EventStat eventStat) {
- if (eventStat == null || eventStat.getEvent() == null) {
- return true;
- }
-
- boolean result = true;
- Event event = eventStat.getEvent();
- if (eventStat.isOrderMessage()) {
- String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
- SinkTask sinkTask =
- sinkThreadPool[Math.abs(partitionKey.hashCode()) % sinkThreadPoolSize];
- result = sinkTask.processEvent(eventStat);
- } else {
- int num = 0;
- do {
- int index = processIndex.getAndIncrement();
- SinkTask sinkTask = sinkThreadPool[index % sinkThreadPoolSize];
- if (sinkTask != null) {
- result = sinkTask.processEvent(eventStat);
- if (result) {
- break;
- }
- }
- num++;
- } while (num < sinkThreadPoolSize);
- }
- return result;
- }
-
- private void processResendEvent(EventStat eventStat) {
- try {
- if (eventStat == null || eventStat.getEvent() == null) {
- logger.warn("processResendEvent eventStat is null!");
- return;
- }
- /*
- * If the failure requires retransmission to pulsar, the sid needs to be removed before retransmission.
- */
- if (pulsarConfig.getClientIdCache()) {
- String clientId = eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
- if (clientId != null && AGENT_ID_CACHE.asMap().containsKey(clientId)) {
- AGENT_ID_CACHE.invalidate(clientId);
- }
- }
- boolean result = false;
- int num = 0;
- do {
- int index = processIndex.getAndIncrement();
- SinkTask sinkTask = sinkThreadPool[index % sinkThreadPoolSize];
- if (sinkTask != null) {
- result = sinkTask.processReSendEvent(eventStat);
- if (result) {
- break;
- }
- }
- num++;
- } while (num < sinkThreadPoolSize);
- if (!result) {
- FailoverChannelProcessorHolder.getChannelProcessor()
- .processEvent(eventStat.getEvent());
- if (logPrinterC.shouldPrint()) {
- logger.error(getName() + " Channel --> pulsar --> ResendQueue(full) "
- + "-->FailOverChannelProcessor(current code point), "
- + "Resend queue is full,Check if pulsar server or network is ok.");
- }
- }
- } catch (Throwable throwable) {
- if (pulsarConfig.getStatIntervalSec() > 0) {
- monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
- }
- if (logPrinterC.shouldPrint()) {
- logger.error(getName() + " Discard msg because put events to both of "
- + "queue and fileChannel fail", throwable);
- }
- }
- }
-
- public LoadingCache<String, Long> getAgentIdCache() {
- return AGENT_ID_CACHE;
- }
-
- public Map<String, String> getTopicsProperties() {
- return topicProperties;
- }
-
- public SinkCounter getSinkCounter() {
- return sinkCounter;
- }
-
- public AtomicLong getCurrentInFlightCount() {
- return currentInFlightCount;
- }
-
- public MQClusterConfig getPulsarConfig() {
- return pulsarConfig;
- }
-
- public int getMaxRetrySendCnt() {
- return maxRetrySendCnt;
- }
-
- static class PulsarPerformanceTask implements Runnable {
-
- @Override
- public void run() {
- try {
- if (TOTAL_PULSAR_SUCC_SEND_SIZE.get() != 0) {
- logger.info("Total pulsar performance tps :"
- + TOTAL_PULSAR_SUCC_SEND_CNT.get() / PRINT_INTERVAL
- + "/s, avg msg size:"
- + TOTAL_PULSAR_SUCC_SEND_SIZE.get() / TOTAL_PULSAR_SUCC_SEND_CNT.get()
- + ",print every " + PRINT_INTERVAL + " seconds");
- // totalPulsarSuccSendCnt represents the number of packets
- TOTAL_PULSAR_SUCC_SEND_CNT.set(0);
- TOTAL_PULSAR_SUCC_SEND_SIZE.set(0);
- }
-
- } catch (Exception e) {
- logger.info("pulsarPerformanceTask error", e);
- }
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
deleted file mode 100644
index f8cbece8b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ /dev/null
@@ -1,714 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.flume.source.shaded.guava.RateLimiter;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SimpleMessageTubeSink extends AbstractSink implements Configurable {
-
- private static final Logger logger = LoggerFactory.getLogger(SimpleMessageTubeSink.class);
- private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
- private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
- private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
-
- private static int BAD_EVENT_QUEUE_SIZE = 10000;
-
- private static final String SINK_THREAD_NUM = "thread-num";
- private static int EVENT_QUEUE_SIZE = 1000;
- private volatile boolean canTake = false;
- private volatile boolean canSend = false;
- private static int BATCH_SIZE = 10000;
- private static final int defaultRetryCnt = -1;
- private static final int defaultLogEveryNEvents = 100000;
- private static final int defaultSendTimeout = 20000; // in millsec
- private static final int defaultStatIntervalSec = 60;
- private static final int sendNewMetricRetryCount = 3;
-
- private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
- private static String TOPIC = "topic";
- private static String SEND_TIMEOUT = "send_timeout"; // in millsec
- private static String LOG_EVERY_N_EVENTS = "log-every-n-events";
- private static String RETRY_CNT = "retry-currentSuccSendedCnt";
- private static String STAT_INTERVAL_SEC = "stat-interval-sec"; // in sec
- private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
-
- private static final String LOG_TOPIC = "proxy-log-topic";
- private static final String STREAMID = "proxy-log-streamid";
- private static final String GROUPID = "proxy-log-groupid";
- private static final String SEND_REMOTE = "send-remote";
- private static final String topicsFilePath = "topics.properties";
- private static final String slaTopicFilePath = "slaTopics.properties";
- private static final String SLA_METRIC_SINK = "sla-metric-sink";
-
- private static String MAX_SURVIVED_TIME = "max-survived-time";
- private static String MAX_SURVIVED_SIZE = "max-survived-size";
- private static String CLIENT_ID_CACHE = "client-id-cache";
-
- private String proxyLogTopic = "teg_manager";
- private String proxyLogGroupId = "b_teg_manager";
- private String proxyLogStreamId = "proxy_measure_log";
- private boolean sendRemote = false;
- private ConfigManager configManager;
- private Map<String, String> topicProperties;
-
- public MessageProducer producer;
- public Map<String, MessageProducer> producerMap;
-
- private LinkedBlockingQueue<EventStat> resendQueue;
- private LinkedBlockingQueue<Event> eventQueue;
-
- private long diskIORatePerSec;
- private RateLimiter diskRateLimiter;
-
- public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
- public TubeMultiSessionFactory sessionFactory;
- private String masterHostAndPortList;
- private Integer logEveryNEvents;
- private Integer sendTimeout;
- private static int retryCnt = defaultRetryCnt;
- private int requestTimeout = 60;
- private int threadNum;
- private Thread[] sinkThreadPool;
-
- private String metaTopicFilePath = topicsFilePath;
- private long linkMaxAllowedDelayedMsgCount;
- private long sessionWarnDelayedMsgCount;
- private long sessionMaxAllowedDelayedMsgCount;
- private long nettyWriteBufferHighWaterMark;
- private int recoverthreadcount;
- //
- private Map<String, String> dimensions;
- private DataProxyMetricItemSet metricItemSet;
- private static final MsgDedupHandler msgDedupHandler = new MsgDedupHandler();
- private static ConcurrentHashMap<String, Long> illegalTopicMap =
- new ConcurrentHashMap<String, Long>();
-
- private boolean overflow = false;
-
- /**
- * diff publish
- *
- * @param originalSet
- * @param endSet
- */
- public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
-
- boolean changed = false;
- for (String s : endSet) {
- if (!originalSet.contains(s)) {
- changed = true;
- try {
- producer = getProducer(s);
- } catch (Exception e) {
- logger.error("Get producer failed!", e);
- }
- }
- }
-
- if (changed) {
- logger.info("topics.properties has changed, trigger diff publish for {}", getName());
- topicProperties = configManager.getTopicProperties();
- }
- }
-
- private MessageProducer getProducer(String topic) throws TubeClientException {
- if (producerMap.containsKey(topic)) {
- return producerMap.get(topic);
- } else {
- synchronized (this) {
- if (!producerMap.containsKey(topic)) {
- if (producer == null || currentPublishTopicNum.get() >= MAX_TOPICS_EACH_PRODUCER_HOLD) {
- producer = sessionFactory.createProducer();
- currentPublishTopicNum.set(0);
- }
- // publish topic
- producer.publish(topic);
- producerMap.put(topic, producer);
- currentPublishTopicNum.incrementAndGet();
- }
- }
- return producerMap.get(topic);
- }
- }
-
- private TubeClientConfig initTubeConfig() throws Exception {
- final TubeClientConfig tubeClientConfig = new TubeClientConfig(NetworkUtils.getLocalIp(),
- this.masterHostAndPortList);
- tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
- tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
- tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
- tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
- tubeClientConfig.setHeartbeatPeriodMs(15000L);
- tubeClientConfig.setRpcTimeoutMs(20000L);
-
- return tubeClientConfig;
- }
-
- /**
- * If this function is called successively without calling {@see #destroyConnection()}, only the
- * first call has any effect.
- *
- * @throws FlumeException if an RPC client connection could not be opened
- */
- private void createConnection() throws FlumeException {
- // synchronized (tubeSessionLock) {
- // if already connected, just skip
- if (sessionFactory != null) {
- return;
- }
-
- try {
- TubeClientConfig conf = initTubeConfig();
- // sessionFactory = new TubeMutilMessageSessionFactory(conf);
- sessionFactory = new TubeMultiSessionFactory(conf);
- } catch (TubeClientException e) {
- logger.error("create connnection error in metasink, "
- + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
- throw new FlumeException("connect to Tube error1, "
- + "maybe zkstr/zkroot set error, please re-check");
- } catch (Throwable e) {
- logger.error("create connnection error in metasink, "
- + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}",
- e.getMessage());
- throw new FlumeException("connect to meta error2, "
- + "maybe tube master set error/shutdown in progress, please re-check");
- }
-
- if (producerMap == null) {
- producerMap = new HashMap<String, MessageProducer>();
- }
- logger.debug("building tube producer");
- // }
- }
-
- private void destroyConnection() {
- for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
- MessageProducer producer = entry.getValue();
- try {
- producer.shutdown();
- } catch (TubeClientException e) {
- logger.error("destroy producer error in metasink, MetaClientException {}", e.getMessage());
- } catch (Throwable e) {
- logger.error("destroy producer error in metasink, ex {}", e.getMessage());
- }
- }
- producerMap.clear();
-
- if (sessionFactory != null) {
- try {
- sessionFactory.shutdown();
- } catch (TubeClientException e) {
- logger.error("destroy sessionFactory error in metasink, MetaClientException {}",
- e.getMessage());
- } catch (Exception e) {
- logger.error("destroy sessionFactory error in metasink, ex {}", e.getMessage());
- }
- }
- sessionFactory = null;
- logger.debug("closed meta producer");
- }
-
- private void initTopicSet(Set<String> topicSet) throws Exception {
- List<String> sortedList = new ArrayList(topicSet);
- Collections.sort(sortedList);
- int cycle = sortedList.size() / MAX_TOPICS_EACH_PRODUCER_HOLD;
- int remainder = sortedList.size() % MAX_TOPICS_EACH_PRODUCER_HOLD;
- long startTime = System.currentTimeMillis();
- for (int i = 0; i <= cycle; i++) {
- Set<String> subset = new HashSet<String>();
- int startIndex = i * MAX_TOPICS_EACH_PRODUCER_HOLD;
- int endIndex = startIndex + MAX_TOPICS_EACH_PRODUCER_HOLD - 1;
- if (i == cycle) {
- if (remainder == 0) {
- continue;
- } else {
- endIndex = startIndex + remainder - 1;
- }
- }
- for (int index = startIndex; index <= endIndex; index++) {
- subset.add(sortedList.get(index));
- }
- producer = sessionFactory.createProducer();
- try {
- Set<String> succTopicSet = producer.publish(subset);
- if (succTopicSet != null) {
- for (String succTopic : succTopicSet) {
- producerMap.put(succTopic, producer);
- }
- currentPublishTopicNum.set(succTopicSet.size());
- logger.info(getName() + " success Subset : " + succTopicSet);
- }
- } catch (Exception e) {
- logger.info(getName() + " meta sink initTopicSet fail.", e);
- }
- }
- logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
- logger.info(getName() + " producer is ready for topics : " + producerMap.keySet());
- }
-
- @Override
- public void start() {
- this.dimensions = new HashMap<>();
- this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- // register metrics
- this.metricItemSet = new DataProxyMetricItemSet(this.getName());
- MetricRegister.register(metricItemSet);
-
- // create tube connection
- try {
- createConnection();
- } catch (FlumeException e) {
- logger.error("Unable to create tube client" + ". Exception follows.", e);
-
- /* Try to prevent leaking resources. */
- destroyConnection();
-
- /* FIXME: Mark ourselves as failed. */
- stop();
- return;
- }
-
- super.start();
- this.canSend = true;
- this.canTake = true;
-
- try {
- initTopicSet(new HashSet<String>(topicProperties.values()));
- } catch (Exception e) {
- logger.info("meta sink start publish topic fail.", e);
- }
-
- for (int i = 0; i < sinkThreadPool.length; i++) {
- sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
- sinkThreadPool[i].start();
- }
-
- }
-
- class SinkTask implements Runnable {
-
- private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es)
- throws TubeClientException, InterruptedException {
- if (msgDedupHandler.judgeDupAndPutMsgSeqId(
- event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
- logger.info("{} agent package {} existed,just discard.",
- getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
- } else {
- producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
- flag.set(true);
- }
- illegalTopicMap.remove(topic);
- }
-
- private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
- if (t instanceof TubeClientException) {
- String message = t.getMessage();
- if (message != null && (message.contains("No available queue for topic")
- || message.contains("The brokers of topic are all forbidden"))) {
- illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
- logger.info("IllegalTopicMap.put " + topic);
- return;
- } else {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore..
- }
- }
- }
- logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name="
- + Thread.currentThread().getName()
- + ",event.headers=" + es.getEvent().getHeaders(), t);
- }
-
- @Override
- public void run() {
- logger.info("Sink task {} started.", Thread.currentThread().getName());
- while (canSend) {
- boolean decrementFlag = false;
- boolean resendBadEvent = false;
- Event event = null;
- EventStat es = null;
- String topic = null;
- try {
- if (SimpleMessageTubeSink.this.overflow) {
- SimpleMessageTubeSink.this.overflow = false;
- Thread.sleep(10);
- }
- if (!resendQueue.isEmpty()) {
- es = resendQueue.poll();
- if (es != null) {
- event = es.getEvent();
- // logger.warn("Resend event: {}", event.toString());
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- resendBadEvent = true;
- }
- } else {
- event = eventQueue.take();
- es = new EventStat(event);
- // sendCnt.incrementAndGet();
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- }
-
- if (event == null) {
- // ignore event is null, when multiple-thread SinkTask running
- // this null value comes from resendQueue
- continue;
- }
-
- if (topic == null || topic.equals("")) {
- logger.warn("no topic specified in event header, just skip this event");
- continue;
- }
-
- Long expireTime = illegalTopicMap.get(topic);
- if (expireTime != null) {
- long currentTime = System.currentTimeMillis();
- if (expireTime > currentTime) {
-
- // TODO: need to be improved.
- // reChannelEvent(es, topic);
- continue;
- } else {
-
- illegalTopicMap.remove(topic);
- }
- }
- MessageProducer producer = null;
- try {
- producer = getProducer(topic);
- } catch (Exception e) {
- logger.error("Get producer failed!", e);
- }
-
- if (producer == null) {
- illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
- continue;
- }
-
- AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
- sendMessage(event, topic, flagAtomic, es);
- decrementFlag = flagAtomic.get();
-
- } catch (InterruptedException e) {
- logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
- return;
- } catch (Throwable t) {
- handleException(t, topic, decrementFlag, es);
- resendEvent(es, decrementFlag);
- }
- }
- }
- }
-
- public class MyCallback implements MessageSentCallback {
-
- private EventStat myEventStat;
- private long sendTime;
-
- public MyCallback(EventStat eventStat) {
- this.myEventStat = eventStat;
- this.sendTime = System.currentTimeMillis();
- }
-
- @Override
- public void onMessageSent(final MessageSentResult result) {
- if (result.isSuccess()) {
- // TODO: add stats
- this.addMetric(myEventStat.getEvent(), true, sendTime);
- } else {
- this.addMetric(myEventStat.getEvent(), false, 0);
- if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
- logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
- result.getErrMsg(), resendQueue.size(),
- myEventStat.getEvent().hashCode());
-
- return;
- }
- if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
- logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
- result.getErrMsg(), resendQueue.size(),
- myEventStat.getEvent().hashCode());
- }
- resendEvent(myEventStat, true);
- }
- }
-
- /**
- * addMetric
- *
- * @param event
- * @param result
- * @param sendTime
- */
- private void addMetric(Event event, boolean result, long sendTime) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, SimpleMessageTubeSink.this.getName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, SimpleMessageTubeSink.this.getName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
- DataProxyMetricItem.fillInlongId(event, dimensions);
- DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-
- DataProxyMetricItem metricItem = SimpleMessageTubeSink.this.metricItemSet.findMetricItem(dimensions);
- if (result) {
- metricItem.sendSuccessCount.incrementAndGet();
- metricItem.sendSuccessSize.addAndGet(event.getBody().length);
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- if (sendTime > 0) {
- long currentTime = System.currentTimeMillis();
- long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- sendTime);
- long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration);
- metricItem.nodeDuration.addAndGet(nodeDuration);
- metricItem.wholeDuration.addAndGet(wholeDuration);
- }
- } else {
- metricItem.sendFailCount.incrementAndGet();
- metricItem.sendFailSize.addAndGet(event.getBody().length);
- }
- }
-
- @Override
- public void onException(final Throwable e) {
- Throwable t = e;
- while (t.getCause() != null) {
- t = t.getCause();
- }
- if (t instanceof OverflowException) {
- SimpleMessageTubeSink.this.overflow = true;
- }
- resendEvent(myEventStat, true);
- }
- }
-
- /**
- * resend event
- *
- * @param es
- * @param isDecrement
- */
- private void resendEvent(EventStat es, boolean isDecrement) {
- try {
- if (es == null || es.getEvent() == null) {
- return;
- }
- msgDedupHandler.invalidMsgSeqId(
- es.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID));
- } catch (Throwable throwable) {
- logger.error(getName() + " Discard msg because put events to both of queue and "
- + "fileChannel fail,current resendQueue.size = "
- + resendQueue.size(), throwable);
- }
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- if (!this.canTake) {
- return Status.BACKOFF;
- }
- Status status = Status.READY;
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event != null) {
- if (diskRateLimiter != null) {
- diskRateLimiter.acquire(event.getBody().length);
- }
- if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
- logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
- + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
- + "it will cause memoryChannel full and fileChannel write.)", getName());
- tx.rollback();
- } else {
- tx.commit();
- // metric
- if (event.getHeaders().containsKey(TOPIC)) {
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(TOPIC));
- } else {
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
- }
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readSuccessCount.incrementAndGet();
- metricItem.readSuccessSize.addAndGet(event.getBody().length);
- }
- } else {
-
- // logger.info("[{}]No data to process in the channel.",getName());
- status = Status.BACKOFF;
- tx.commit();
- }
- } catch (Throwable t) {
- logger.error("Process event failed!" + this.getName(), t);
- try {
- tx.rollback();
- } catch (Throwable e) {
- logger.error("metasink transaction rollback exception", e);
-
- }
- } finally {
- tx.close();
- }
- return status;
- }
-
- @Override
- public void configure(Context context) {
- logger.info(context.toString());
- // logger.info("sinktest:"+getName()+getChannel());//sinktest:meta-sink-msg2null
-
- configManager = ConfigManager.getInstance();
- topicProperties = configManager.getTopicProperties();
- configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
- @Override
- public void update() {
-
- diffSetPublish(new HashSet<String>(topicProperties.values()),
- new HashSet<String>(configManager.getTopicProperties().values()));
- }
- });
-
- masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
- Preconditions.checkState(masterHostAndPortList != null, "No master and port list specified");
-
- producerMap = new HashMap<String, MessageProducer>();
-
- logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, defaultLogEveryNEvents);
- logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + logEveryNEvents);
- Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
-
- sendTimeout = context.getInteger(SEND_TIMEOUT, defaultSendTimeout);
- logger.debug(this.getName() + " " + SEND_TIMEOUT + " " + sendTimeout);
- Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
-
- MAX_TOPICS_EACH_PRODUCER_HOLD = context.getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD_NAME, 200);
- retryCnt = context.getInteger(RETRY_CNT, defaultRetryCnt);
- logger.debug(this.getName() + " " + RETRY_CNT + " " + retryCnt);
-
- boolean isSlaMetricSink = context.getBoolean(SLA_METRIC_SINK, false);
- if (isSlaMetricSink) {
- this.metaTopicFilePath = slaTopicFilePath;
- }
- // start message deduplication handler
- msgDedupHandler.start(context.getBoolean(CLIENT_ID_CACHE, false),
- context.getInteger(MAX_SURVIVED_TIME, -1),
- context.getInteger(MAX_SURVIVED_SIZE, -1));
-
- String requestTimeout = context.getString(TUBE_REQUEST_TIMEOUT);
- if (requestTimeout != null) {
- this.requestTimeout = Integer.parseInt(requestTimeout);
- }
-
- String sendRemoteStr = context.getString(SEND_REMOTE);
- if (sendRemoteStr != null) {
- sendRemote = Boolean.parseBoolean(sendRemoteStr);
- }
- if (sendRemote) {
- proxyLogTopic = context.getString(LOG_TOPIC, proxyLogTopic);
- proxyLogGroupId = context.getString(GROUPID, proxyLogStreamId);
- proxyLogStreamId = context.getString(STREAMID, proxyLogStreamId);
- }
-
- resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
-
- String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
- threadNum = Integer.parseInt(sinkThreadNum);
- Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
- sinkThreadPool = new Thread[threadNum];
- eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
-
- diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L);
- if (diskIORatePerSec != 0) {
- diskRateLimiter = RateLimiter.create(diskIORatePerSec);
- }
-
- linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
- 80000L);
- sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
- 2000000L);
- sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
- 4000000L);
- nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
- 15 * 1024 * 1024L);
- recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
- Runtime.getRuntime().availableProcessors() + 1);
- }
-
- /**
- * get metricItemSet
- * @return the metricItemSet
- */
- public DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
deleted file mode 100644
index 88da9d079..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-
-import com.google.common.base.Preconditions;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.collections.SetUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.flume.source.shaded.guava.RateLimiter;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TubeSink extends AbstractSink implements Configurable {
-
- private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
- private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
- private TubeProducerHolder producerHolder = null;
- private volatile boolean canSend = false;
- private volatile boolean isOverFlow = false;
- private ConfigManager configManager;
- private Map<String, String> topicProperties;
- private MQClusterConfig tubeConfig;
- private String usedMasterAddr = null;
- private Set<String> masterHostAndPortLists;
- // statistic info log
- private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
- private int statIntervalSec = 60;
- private MonitorIndex monitorIndex;
- private MonitorIndexExt monitorIndexExt;
- private static final String KEY_SINK_DROPPED = "TUBE_SINK_DROPPED";
- private static final String KEY_SINK_SUCCESS = "TUBE_SINK_SUCCESS";
- private static final String KEY_SINK_FAILURE = "TUBE_SINK_FAILURE";
- private static final String KEY_SINK_EXP = "TUBE_SINK_EXP";
- // used for RoundRobin different cluster while send message
- private RateLimiter diskRateLimiter;
- private Thread[] sinkThreadPool;
- private DataProxyMetricItemSet metricItemSet;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private static final LogCounter LOG_SINK_TASK_PRINTER =
- new LogCounter(10, 100000, 60 * 1000);
- private LinkedBlockingQueue<Event> eventQueue;
- private LinkedBlockingQueue<EventStat> resendQueue;
- private final AtomicLong cachedMsgCnt = new AtomicLong(0);
- private final AtomicLong takenMsgCnt = new AtomicLong(0);
- private final AtomicLong resendMsgCnt = new AtomicLong(0);
- private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0);
- private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0);
- private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0);
- private final AtomicLong inflightMsgCnt = new AtomicLong(0);
- private final AtomicLong successMsgCnt = new AtomicLong(0);
- // statistic thread
- private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
- .newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread"));
-
- {
- SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L,
- 60L, TimeUnit.SECONDS);
- logger.info("success to start performance statistic task!");
- }
-
- @Override
- public void configure(Context context) {
- logger.info(getName() + " configure from context: {}", context);
- // initial parameters
- configManager = ConfigManager.getInstance();
- tubeConfig = configManager.getMqClusterConfig();
- topicProperties = configManager.getTopicProperties();
- masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
- // only use first cluster address now
- usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
- // create producer holder
- if (usedMasterAddr != null) {
- producerHolder = new TubeProducerHolder(getName(),
- usedMasterAddr, configManager.getMqClusterConfig());
- }
- // start message deduplication handler
- MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
- tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
- // get maxMonitorCnt's configure value
- try {
- maxMonitorCnt = context.getInteger(
- ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT);
- } catch (NumberFormatException e) {
- logger.warn("Property {} must specify an integer value: {}",
- ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT));
- }
- Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0");
- statIntervalSec = tubeConfig.getStatIntervalSec();
- Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
- // initial TubeMQ configure
- // initial resend queue size
- int badEventQueueSize = tubeConfig.getBadEventQueueSize();
- Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
- resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
- // initial sink thread pool
- int threadNum = tubeConfig.getThreadNum();
- Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
- sinkThreadPool = new Thread[threadNum];
- // initial event queue size
- int eventQueueSize = tubeConfig.getEventQueueSize();
- Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
- eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
- // initial disk rate limiter
- if (tubeConfig.getDiskIoRatePerSec() != 0) {
- diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
- }
- // register configure change callback functions
- configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
-
- @Override
- public void update() {
- diffSetPublish(new HashSet<>(topicProperties.values()),
- new HashSet<>(configManager.getTopicProperties().values()));
- }
- });
- configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
-
- @Override
- public void update() {
- diffUpdateTubeClient(masterHostAndPortLists,
- configManager.getMqClusterUrl2Token().keySet());
- }
- });
- }
-
- @Override
- public void start() {
- if (!this.started.compareAndSet(false, true)) {
- logger.info("Duplicated call, " + getName() + " has started!");
- return;
- }
- // initial monitors
- if (statIntervalSec > 0) {
- // switch for lots of metrics
- monitorIndex = new MonitorIndex("Tube_Sink", statIntervalSec, maxMonitorCnt);
- monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + this.getName(),
- statIntervalSec, maxMonitorCnt);
- }
- // register metrics
- ConfigManager configManager = ConfigManager.getInstance();
- String clusterId =
- configManager.getCommonProperties().getOrDefault(
- ConfigConstants.PROXY_CLUSTER_NAME,
- ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
- this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
- MetricRegister.register(metricItemSet);
- // create tube connection
- if (producerHolder != null) {
- try {
- producerHolder.start(new HashSet<>(topicProperties.values()));
- ConfigManager.getInstance().updMqClusterStatus(true);
- logger.info("[{}] MQ Cluster service status ready!", getName());
- } catch (FlumeException e) {
- logger.error("Unable to start TubeMQ client. Exception follows.", e);
- super.stop();
- return;
- }
- }
- // start the cleaner thread
- super.start();
- this.canSend = true;
- for (int i = 0; i < sinkThreadPool.length; i++) {
- sinkThreadPool[i] = new Thread(new TubeSinkTask(),
- getName() + "_tube_sink_sender-" + i);
- sinkThreadPool[i].start();
- }
- logger.info(getName() + " started!");
- }
-
- @Override
- public void stop() {
- if (!this.started.compareAndSet(true, false)) {
- logger.info("Duplicated call, " + getName() + " has stopped!");
- return;
- }
- // waiting inflight message processed
- int waitCount = 0;
- while (takenMsgCnt.get() > 0 && waitCount++ < 10) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.info("Stop thread has been interrupt!");
- break;
- }
- }
- // close sink thread pool
- if (sinkThreadPool != null) {
- for (Thread thread : sinkThreadPool) {
- if (thread == null) {
- continue;
- }
- thread.interrupt();
- }
- }
- // close producer holder
- if (producerHolder != null) {
- producerHolder.stop();
- }
- // stop statistic thread stop
- SCHEDULED_EXECUTOR_SERVICE.shutdown();
- // stop monitor index output
- if (statIntervalSec > 0) {
- try {
- monitorIndex.shutDown();
- } catch (Exception e) {
- logger.warn("Stats runner interrupted");
- }
- }
- super.stop();
- logger.info(getName() + " stopped!");
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- if (!this.started.get()) {
- return Status.BACKOFF;
- }
- Status status = Status.READY;
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event != null) {
- if (diskRateLimiter != null) {
- diskRateLimiter.acquire(event.getBody().length);
- }
- if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
- tx.commit();
- cachedMsgCnt.incrementAndGet();
- } else {
- tx.rollback();
- }
- } else {
- status = Status.BACKOFF;
- tx.commit();
- }
- } catch (Throwable t) {
- logger.error("Process event failed!" + this.getName(), t);
- try {
- tx.rollback();
- } catch (Throwable e) {
- logger.error("meta sink transaction rollback exception", e);
- }
- } finally {
- tx.close();
- }
- return status;
- }
-
- private class TubeSinkTask implements Runnable {
-
- public TubeSinkTask() {
- // ignore
- }
-
- @Override
- public void run() {
- Event event = null;
- EventStat es = null;
- String topic = null;
- boolean bChangedInflightValue = false;
- logger.info("sink task {} started.", Thread.currentThread().getName());
- while (canSend) {
- try {
- if (!started.get() && cachedMsgCnt.get() <= 0) {
- logger.info("Found started is false and taken message count is zero, braek!");
- break;
- }
- if (isOverFlow) {
- isOverFlow = false;
- Thread.sleep(30);
- }
- // get event from queues
- if (resendQueue.isEmpty()) {
- event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
- if (event == null) {
- continue;
- }
- cachedMsgCnt.decrementAndGet();
- takenMsgCnt.incrementAndGet();
- es = new EventStat(event);
- } else {
- es = resendQueue.poll();
- if (es == null) {
- continue;
- }
- resendMsgCnt.decrementAndGet();
- event = es.getEvent();
- }
- topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
- // valid event status
- if (StringUtils.isBlank(topic)) {
- blankTopicDiscardMsgCnt.incrementAndGet();
- takenMsgCnt.decrementAndGet();
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
- }
- if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.error("No topic specified, just discard the event, event header is "
- + event.getHeaders().toString());
- }
- continue;
- }
- // send message
- bChangedInflightValue = sendMessage(es, topic);
- } catch (InterruptedException e) {
- logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
- return;
- } catch (Throwable t) {
- resendEvent(es, bChangedInflightValue,
- DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, t.getMessage());
- if (t instanceof TubeClientException) {
- String message = t.getMessage();
- if (message != null && (message.contains("No available queue for topic")
- || message.contains("The brokers of topic are all forbidden"))) {
- isOverFlow = true;
- }
- }
- if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.error(
- "Sink task fail to send the message, finished = {}, sink.name = {},event.headers= {}",
- bChangedInflightValue, Thread.currentThread().getName(),
- es.getEvent().getHeaders(), t);
- }
- }
- }
- logger.info("sink task {} stopped!", Thread.currentThread().getName());
- }
-
- private boolean sendMessage(EventStat es, String topic) throws Exception {
- String errMsg = "";
- Event event = es.getEvent();
- MessageProducer producer = producerHolder.getProducer(topic);
- if (producer == null) {
- frozenTopicDiscardMsgCnt.incrementAndGet();
- takenMsgCnt.decrementAndGet();
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
- }
- errMsg = "Get producer failed for " + topic;
- if (MessageUtils.isSinkRspType(event)) {
- MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
- DataProxyErrCode.PRODUCER_IS_NULL, errMsg);
- }
- if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.error(errMsg);
- }
- return false;
- }
- if (MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId(
- event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
- dupDiscardMsgCnt.incrementAndGet();
- takenMsgCnt.decrementAndGet();
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
- }
- errMsg = "Duplicated message discard, by uuid = "
- + event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
- if (MessageUtils.isSinkRspType(event)) {
- MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
- DataProxyErrCode.DUPLICATED_MESSAGE, errMsg);
- }
- logger.info("{} agent package {} existed,just discard.",
- Thread.currentThread().getName(),
- event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
- return false;
- } else {
- producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
- inflightMsgCnt.incrementAndGet();
- return true;
- }
- }
- }
-
- private class MyCallback implements MessageSentCallback {
-
- private EventStat myEventStat;
- private long sendTime;
-
- public MyCallback(EventStat eventStat) {
- this.myEventStat = eventStat;
- this.sendTime = System.currentTimeMillis();
- }
-
- @Override
- public void onMessageSent(final MessageSentResult result) {
- if (result.isSuccess()) {
- successMsgCnt.incrementAndGet();
- inflightMsgCnt.decrementAndGet();
- takenMsgCnt.decrementAndGet();
- this.addStatistics(myEventStat.getEvent(), true, false, sendTime);
- if (MessageUtils.isSinkRspType(myEventStat.getEvent())) {
- MessageUtils.sinkReturnRspPackage((SinkRspEvent) myEventStat.getEvent(),
- DataProxyErrCode.SUCCESS, "");
- }
- } else {
- this.addStatistics(myEventStat.getEvent(), false, false, 0);
- if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
- logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
- result.getErrMsg(), resendQueue.size(),
- myEventStat.getEvent().hashCode());
- return;
- } else if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW
- && LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
- result.getErrMsg(), resendQueue.size(),
- myEventStat.getEvent().hashCode());
- }
- resendEvent(myEventStat, true, DataProxyErrCode.MQ_RETURN_ERROR,
- result.getErrCode() + "#" + result.getErrMsg());
- }
- }
-
- @Override
- public void onException(final Throwable e) {
- addStatistics(myEventStat.getEvent(), false, true, 0);
- resendEvent(myEventStat, true, DataProxyErrCode.UNKNOWN_ERROR, e.getMessage());
- }
-
- /**
- * Add statistics information
- *
- * @param event the statistic event
- * @param isSuccess is processed successfully
- * @param isException is exception when failure processed
- * @param sendTime the send time when success processed
- */
- private void addStatistics(Event event, boolean isSuccess,
- boolean isException, long sendTime) {
- if (event == null) {
- return;
- }
- // add jmx metric items;
- TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
- event, sendTime, isSuccess, event.getBody().length);
- // add audit items;
- if (isSuccess) {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- }
- if (statIntervalSec <= 0) {
- return;
- }
- // add monitor items base file storage
- String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
- String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
- int intMsgCnt = Integer.parseInt(
- event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- long dataTimeL = Long.parseLong(
- event.getHeaders().get(AttributeConstants.DATA_TIME));
- Pair<Boolean, String> evenProcType =
- MessageUtils.getEventProcType("", "");
- // build statistic key
- StringBuilder newBase = new StringBuilder(512)
- .append(getName()).append(SEP_HASHTAG).append(topic)
- .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
- .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
- .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
- .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
- // count data
- if (isSuccess) {
- monitorIndex.addAndGet(newBase.toString(),
- intMsgCnt, 1, event.getBody().length, 0);
- monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
- } else {
- monitorIndex.addAndGet(newBase.toString(),
- 0, 0, 0, intMsgCnt);
- monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
- if (isException) {
- monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
- }
- }
- }
- }
-
- private class TubeStatsTask implements Runnable {
-
- @Override
- public void run() {
- if (!started.get()) {
- return;
- }
- logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get()
- + ", takenMsgCnt=" + takenMsgCnt.get()
- + ", resendMsgCnt=" + resendMsgCnt.get()
- + ", blankTopicDiscardMsgCnt=" + blankTopicDiscardMsgCnt.get()
- + ", frozenTopicDiscardMsgCnt=" + frozenTopicDiscardMsgCnt.get()
- + ", dupDiscardMsgCnt=" + dupDiscardMsgCnt.get()
- + ", inflightMsgCnt=" + inflightMsgCnt.get()
- + ", successMsgCnt=" + successMsgCnt.get());
- }
- }
-
- /**
- * resend event
- */
- private void resendEvent(EventStat es, boolean sendFinished,
- DataProxyErrCode errCode, String errMsg) {
- try {
- if (sendFinished) {
- inflightMsgCnt.decrementAndGet();
- }
- if (es == null || es.getEvent() == null) {
- takenMsgCnt.decrementAndGet();
- return;
- }
- MSG_DEDUP_HANDLER.invalidMsgSeqId(es.getEvent()
- .getHeaders().get(ConfigConstants.SEQUENCE_ID));
- if (MessageUtils.isSinkRspType(es.getEvent())) {
- MessageUtils.sinkReturnRspPackage((SinkRspEvent) es.getEvent(), errCode, errMsg);
- } else {
- if (resendQueue.offer(es)) {
- resendMsgCnt.incrementAndGet();
- } else {
- FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
- takenMsgCnt.decrementAndGet();
- if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.error(Thread.currentThread().getName()
- + " Channel --> Tube --> ResendQueue(full) -->"
- + "FailOverChannelProcessor(current code point),"
- + " Resend queue is full,Check if Tube server or network is ok.");
- }
- }
- }
- } catch (Throwable throwable) {
- takenMsgCnt.decrementAndGet();
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
- }
- if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
- logger.error(getName() + " Discard msg because put events to both of queue and "
- + "fileChannel fail,current resendQueue.size = "
- + resendQueue.size(), throwable);
- }
- }
- }
-
- /**
- * Differentiate unpublished topic sets and publish them
- * attention: only append added topics
- *
- * @param curTopicSet the current used topic set
- * @param newTopicSet the latest configured topic set
- */
- private void diffSetPublish(Set<String> curTopicSet, Set<String> newTopicSet) {
- if (!this.started.get()) {
- logger.info(getName() + " not started, ignore this change!");
- }
- if (SetUtils.isEqualSet(curTopicSet, newTopicSet)) {
- return;
- }
- // filter unpublished topics
- Set<String> addedTopics = new HashSet<>();
- for (String topic : newTopicSet) {
- if (StringUtils.isBlank(topic)) {
- continue;
- }
- if (!curTopicSet.contains(topic)) {
- addedTopics.add(topic);
- }
- }
- // publish them
- if (!addedTopics.isEmpty()) {
- if (producerHolder != null) {
- try {
- producerHolder.createProducersByTopicSet(addedTopics);
- } catch (Throwable e) {
- logger.info(getName() + "'s publish new topic set fail.", e);
- }
- }
- logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
- addedTopics);
- topicProperties = configManager.getTopicProperties();
- }
- }
-
- /**
- * When masterUrlLists change, update tubeClient
- * Requirement: when switching the Master cluster,
- * the DataProxy node must not do the data reporting service
- *
- * @param curClusterSet previous masterHostAndPortList set
- * @param newClusterSet new masterHostAndPortList set
- */
- private void diffUpdateTubeClient(Set<String> curClusterSet,
- Set<String> newClusterSet) {
- if (!this.started.get()) {
- logger.info(getName() + " not started, ignore this change!");
- }
- if (newClusterSet == null || newClusterSet.isEmpty()
- || SetUtils.isEqualSet(curClusterSet, newClusterSet)
- || newClusterSet.contains(usedMasterAddr)) {
- return;
- }
- String newMasterAddr = getFirstClusterAddr(newClusterSet);
- if (newMasterAddr == null) {
- return;
- }
- TubeProducerHolder newProducerHolder = new TubeProducerHolder(getName(),
- newMasterAddr, configManager.getMqClusterConfig());
- try {
- newProducerHolder.start(new HashSet<>(configManager.getTopicProperties().values()));
- } catch (Throwable e) {
- logger.error(getName() + " create new producer holder for " + newMasterAddr
- + " failure, throw exception is {}", e.getMessage());
- return;
- }
- // replace current producer holder
- final String tmpMasterAddr = usedMasterAddr;
- TubeProducerHolder tmpProducerHolder = producerHolder;
- producerHolder = newProducerHolder;
- usedMasterAddr = newMasterAddr;
- // close old producer holder
- if (tmpProducerHolder == null) {
- diffSetPublish(new HashSet<>(),
- new HashSet<>(configManager.getTopicProperties().values()));
- } else {
- tmpProducerHolder.stop();
- }
- if (!ConfigManager.getInstance().isMqClusterReady()) {
- ConfigManager.getInstance().updMqClusterStatus(true);
- logger.info("[{}] MQ Cluster service status ready!", getName());
- }
- logger.info(getName() + " switch cluster from "
- + tmpMasterAddr + " to " + usedMasterAddr);
- }
-
- /**
- * Get first cluster address
- *
- * @param clusterSet cluster set configure
- * @return the selected cluster address
- * null if set is empty or if items are all blank
- */
- private String getFirstClusterAddr(Set<String> clusterSet) {
- String tmpMasterAddr = null;
- for (String masterAddr : clusterSet) {
- if (StringUtils.isBlank(masterAddr)) {
- continue;
- }
- tmpMasterAddr = masterAddr;
- break;
- }
- return tmpMasterAddr;
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java
deleted file mode 100644
index 3692c0ab3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstactZoneWorker.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstactZoneWorker extends Thread {
-
- public static final Logger LOG = LoggerFactory.getLogger(AbstactZoneWorker.class);
-
- protected final String workerName;
- protected final AbstractZoneSinkContext context;
-
- protected AbstractZoneProducer zoneProducer;
- protected LifecycleState status;
-
- protected int workerIndex;
-
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
- public AbstactZoneWorker(String sinkName, int workerIndex, AbstractZoneSinkContext context,
- AbstractZoneProducer zoneProducer) {
- super();
- this.workerName = sinkName + "-worker-" + workerIndex;
- this.workerIndex = workerIndex;
- this.context = context;
- this.zoneProducer = zoneProducer;
- this.status = LifecycleState.IDLE;
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- this.zoneProducer.start();
- this.status = LifecycleState.START;
- super.start();
- }
-
- /**
- *
- * close
- */
- public void close() {
- // close all producers
- this.zoneProducer.close();
- this.status = LifecycleState.STOP;
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- while (status != LifecycleState.STOP) {
- try {
- DispatchProfile event = context.getDispatchQueues().get(workerIndex).poll();
- if (event == null) {
- this.sleepOneInterval();
- continue;
- }
- // metric
- context.addSendMetric(event, workerName);
- // send
- this.zoneProducer.send(event);
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- this.sleepOneInterval();
- }
- }
- }
-
- /**
- * sleepOneInterval
- */
- private void sleepOneInterval() {
- try {
- Thread.sleep(context.getProcessInterval());
- } catch (InterruptedException e1) {
- LOG.error(e1.getMessage(), e1);
- }
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java
deleted file mode 100644
index 9fbd07894..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneClusterProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flume.Context;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.sdk.commons.protocol.EventConstants;
-
-public abstract class AbstractZoneClusterProducer implements LifecycleAware {
-
- protected final String workerName;
- protected final CacheClusterConfig config;
- protected final AbstractZoneSinkContext sinkContext;
- protected final Context producerContext;
- protected final String cacheClusterName;
- protected LifecycleState state;
-
- /**
- * Constructor
- *
- * @param workerName
- * @param config
- * @param context
- */
- public AbstractZoneClusterProducer(String workerName, CacheClusterConfig config, AbstractZoneSinkContext context) {
- this.workerName = workerName;
- this.config = config;
- this.sinkContext = context;
- this.producerContext = context.getProducerContext();
- this.state = LifecycleState.IDLE;
- this.cacheClusterName = config.getClusterName();
- }
-
- /**
- * getLifecycleState
- *
- * @return
- */
- @Override
- public LifecycleState getLifecycleState() {
- return state;
- }
-
- /**
- * send DispatchProfile
- *
- * @param event DispatchProfile
- * @return boolean sendResult
- */
- public abstract boolean send(DispatchProfile event);
-
- /**
- * encodeCacheMessageHeaders
- *
- * @param event
- * @return Map
- */
- public Map<String, String> encodeCacheMessageHeaders(DispatchProfile event) {
- Map<String, String> headers = new HashMap<>();
- // version int32 protocol version, the value is 1
- headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
- // inlongGroupId string inlongGroupId
- headers.put(EventConstants.INLONG_GROUP_ID, event.getInlongGroupId());
- // inlongStreamId string inlongStreamId
- headers.put(EventConstants.INLONG_STREAM_ID, event.getInlongStreamId());
- // proxyName string proxy node id, IP or conainer name
- headers.put(EventConstants.HEADER_KEY_PROXY_NAME, sinkContext.getNodeId());
- // packTime int64 pack time, milliseconds
- headers.put(EventConstants.HEADER_KEY_PACK_TIME, String.valueOf(System.currentTimeMillis()));
- // msgCount int32 message count
- headers.put(EventConstants.HEADER_KEY_MSG_COUNT, String.valueOf(event.getEvents().size()));
- // srcLength int32 total length of raw messages body
- headers.put(EventConstants.HEADER_KEY_SRC_LENGTH, String.valueOf(event.getSize()));
- // compressType int
- // compress type of body data
- // INLONG_NO_COMPRESS = 0,
- // INLONG_GZ = 1,
- // INLONG_SNAPPY = 2
- headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
- String.valueOf(sinkContext.getCompressType().getNumber()));
- // messageKey string partition hash key, optional
- return headers;
- }
-
- /**
- * get cacheClusterName
- *
- * @return the cacheClusterName
- */
- public String getCacheClusterName() {
- return cacheClusterName;
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java
deleted file mode 100644
index 7be2f38a3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneProducer.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractZoneProducer {
-
- public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneProducer.class);
- public static final int MAX_INDEX = Integer.MAX_VALUE / 2;
-
- protected final String workerName;
- protected final AbstractZoneSinkContext context;
- protected Timer reloadTimer;
-
- protected List<AbstractZoneClusterProducer> clusterList = new ArrayList<>();
- protected List<AbstractZoneClusterProducer> deletingClusterList = new ArrayList<>();
-
- protected AtomicInteger clusterIndex = new AtomicInteger(0);
-
- public AbstractZoneProducer(String workerName,
- AbstractZoneSinkContext context) {
- this.workerName = workerName;
- this.context = context;
- }
-
- /**
- * start
- */
- public void start() {
- try {
- this.reload();
- this.setReloadTimer();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * close
- */
- public void close() {
- try {
- this.reloadTimer.cancel();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- for (AbstractZoneClusterProducer cluster : this.clusterList) {
- cluster.stop();
- }
- }
-
- /**
- * setReloadTimer
- */
- private void setReloadTimer() {
- reloadTimer = new Timer(true);
- TimerTask task = new TimerTask() {
-
- public void run() {
- reload();
- }
- };
- reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
- context.getReloadInterval());
- }
-
- /**
- * reload
- */
- public abstract void reload();
-
- protected void reload(ZoneClusterProducerCalculator zoneClusterProducerCalculator) {
- try {
- // stop deleted cluster
- deletingClusterList.forEach(item -> {
- item.stop();
- });
- deletingClusterList.clear();
- // update cluster list
- List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
- List<AbstractZoneClusterProducer> newClusterList = new ArrayList<>(configList.size());
- // prepare
- Set<String> newClusterNames = new HashSet<>();
- configList.forEach(item -> {
- newClusterNames.add(item.getClusterName());
- });
- Set<String> oldClusterNames = new HashSet<>();
- clusterList.forEach(item -> {
- oldClusterNames.add(item.getCacheClusterName());
- });
- // add
- for (CacheClusterConfig config : configList) {
- if (!oldClusterNames.contains(config.getClusterName())) {
- AbstractZoneClusterProducer cluster = zoneClusterProducerCalculator.calculator(workerName,
- config, context);
- cluster.start();
- newClusterList.add(cluster);
- }
- }
- // remove
- for (AbstractZoneClusterProducer cluster : this.clusterList) {
- if (newClusterNames.contains(cluster.getCacheClusterName())) {
- newClusterList.add(cluster);
- } else {
- deletingClusterList.add(cluster);
- }
- }
- this.clusterList = newClusterList;
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
-
- }
-
- /**
- * send
- *
- * @param event
- */
- public boolean send(DispatchProfile event) {
- int currentIndex = clusterIndex.getAndIncrement();
- if (currentIndex > MAX_INDEX) {
- clusterIndex.set(0);
- }
- List<AbstractZoneClusterProducer> currentClusterList = this.clusterList;
- int currentSize = currentClusterList.size();
- int realIndex = currentIndex % currentSize;
- AbstractZoneClusterProducer clusterProducer = currentClusterList.get(realIndex);
- return clusterProducer.send(event);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java
deleted file mode 100644
index 9776c83f3..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSink.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.dataproxy.dispatch.DispatchManager;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractZoneSink extends AbstractSink implements Configurable {
-
- public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneSink.class);
-
- protected Context parentContext;
- protected AbstractZoneSinkContext context;
- protected List<AbstactZoneWorker> workers = new ArrayList<>();
- // message group
- protected DispatchManager dispatchManager;
- protected ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues = new ArrayList<>();
- // scheduled thread pool
- // reload
- // dispatch
- protected ScheduledExecutorService scheduledPool;
-
- /**
- * configure
- *
- * @param context
- */
- @Override
- public void configure(Context context) {
- LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
- this.parentContext = context;
- }
-
- public void start(ZoneWorkerCalculator zoneWorkerCalculator) {
- try {
- if (getChannel() == null) {
- LOG.error("channel is null");
- }
- this.context.start();
- for (int i = 0; i < context.getMaxThreads(); i++) {
- LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
- dispatchQueues.add(dispatchQueue);
- }
- this.dispatchManager = new DispatchManager(parentContext, dispatchQueues);
- this.scheduledPool = Executors.newScheduledThreadPool(2);
- // dispatch
- this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
-
- public void run() {
- dispatchManager.setNeedOutputOvertimeData();
- }
- }, this.dispatchManager.getDispatchTimeout(),
- this.dispatchManager.getDispatchTimeout(),
- TimeUnit.MILLISECONDS);
- // create worker
- for (int i = 0; i < context.getMaxThreads(); i++) {
- AbstactZoneWorker worker = zoneWorkerCalculator.calculator(this.getName(), i, context);
- worker.start();
- this.workers.add(worker);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- super.start();
- }
-
- @Deprecated
- public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet, Set<String> endSet) {
- return;
- }
-
- @Deprecated
- public void diffUpdatePulsarClient(PulsarClientService pulsarClientService, Map<String, String> originalCluster,
- Map<String, String> endCluster) {
- this.workers.forEach(worker -> {
- worker.zoneProducer.reload();
- });
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- for (AbstactZoneWorker worker : workers) {
- try {
- worker.close();
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
- this.context.close();
- super.stop();
- }
-
- /**
- * process
- *
- * @return Status
- * @throws EventDeliveryException
- */
- @Override
- public Sink.Status process() throws EventDeliveryException {
- this.dispatchManager.outputOvertimeData();
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event == null) {
- tx.commit();
- return Sink.Status.BACKOFF;
- }
- // ProxyEvent
- if (event instanceof ProxyEvent) {
- ProxyEvent proxyEvent = (ProxyEvent) event;
- this.dispatchManager.addEvent(proxyEvent);
- tx.commit();
- return Sink.Status.READY;
- }
- // ProxyPackEvent
- if (event instanceof ProxyPackEvent) {
- ProxyPackEvent packEvent = (ProxyPackEvent) event;
- this.dispatchManager.addPackEvent(packEvent);
- tx.commit();
- return Sink.Status.READY;
- }
- tx.commit();
- this.context.addSendFailMetric();
- return Sink.Status.READY;
- } catch (Throwable t) {
- LOG.error("Process event failed!" + this.getName(), t);
- try {
- tx.rollback();
- } catch (Throwable e) {
- LOG.error("Channel take transaction rollback exception:" + getName(), e);
- }
- return Sink.Status.BACKOFF;
- } finally {
- tx.close();
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java
deleted file mode 100644
index d04d8b225..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * SinkContext
- */
-public abstract class AbstractZoneSinkContext {
-
- public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneSinkContext.class);
-
- public static final String KEY_MAX_THREADS = "maxThreads";
- public static final String KEY_PROCESS_INTERVAL = "processInterval";
- public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
-
- protected final String sinkName;
- protected final Context sinkContext;
-
- protected final Channel channel;
-
- protected final int maxThreads;
- protected final long processInterval;
- protected final long reloadInterval;
-
- protected final DataProxyMetricItemSet metricItemSet;
- protected Timer reloadTimer;
-
- public static final String KEY_NODE_ID = "nodeId";
- public static final String PREFIX_PRODUCER = "producer.";
- public static final String KEY_COMPRESS_TYPE = "compressType";
-
- protected ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues = new ArrayList<>();
-
- protected final String proxyClusterId;
- protected final String nodeId;
- protected final Context producerContext;
- protected final IdTopicConfigHolder idTopicHolder;
- protected final CacheClusterConfigHolder cacheHolder;
- protected final ProxySdk.INLONG_COMPRESSED_TYPE compressType;
-
- /**
- * Constructor
- */
- public AbstractZoneSinkContext(String sinkName, Context context, Channel channel,
- ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
- this.sinkName = sinkName;
- this.sinkContext = context;
- this.channel = channel;
- this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
- this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
- this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
- //
- this.metricItemSet = new DataProxyMetricItemSet(sinkName);
- MetricRegister.register(this.metricItemSet);
-
- this.dispatchQueues = dispatchQueues;
- // proxyClusterId
- this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
- // nodeId
- this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
- // compressionType
- String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
- ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
- this.compressType = ProxySdk.INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
- // producerContext
- Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
- this.producerContext = new Context(producerParams);
- // idTopicHolder
- Context commonPropertiesContext = new Context(CommonPropertiesHolder.get());
- this.idTopicHolder = new IdTopicConfigHolder();
- this.idTopicHolder.configure(commonPropertiesContext);
- // cacheHolder
- this.cacheHolder = new CacheClusterConfigHolder();
- this.cacheHolder.configure(commonPropertiesContext);
- }
-
- /**
- * start
- */
- public void start() {
- try {
- this.reload();
- this.setReloadTimer();
- this.idTopicHolder.start();
- this.cacheHolder.start();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * close
- */
- public void close() {
- try {
- this.reloadTimer.cancel();
- this.idTopicHolder.close();
- this.cacheHolder.close();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * setReloadTimer
- */
- protected void setReloadTimer() {
- reloadTimer = new Timer(true);
- TimerTask task = new TimerTask() {
-
- public void run() {
- reload();
- }
- };
- reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
- }
-
- /**
- * reload
- */
- public void reload() {
- }
-
- /**
- * get sinkName
- *
- * @return the sinkName
- */
- public String getSinkName() {
- return sinkName;
- }
-
- /**
- * get sinkContext
- *
- * @return the sinkContext
- */
- public Context getSinkContext() {
- return sinkContext;
- }
-
- /**
- * get channel
- *
- * @return the channel
- */
- public Channel getChannel() {
- return channel;
- }
-
- /**
- * get maxThreads
- *
- * @return the maxThreads
- */
- public int getMaxThreads() {
- return maxThreads;
- }
-
- /**
- * get processInterval
- *
- * @return the processInterval
- */
- public long getProcessInterval() {
- return processInterval;
- }
-
- /**
- * get reloadInterval
- *
- * @return the reloadInterval
- */
- public long getReloadInterval() {
- return reloadInterval;
- }
-
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
- public DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
- /**
- * get producerContext
- *
- * @return the producerContext
- */
- public Context getProducerContext() {
- return producerContext;
- }
-
- /**
- * get idTopicHolder
- *
- * @return the idTopicHolder
- */
- public IdTopicConfigHolder getIdTopicHolder() {
- return idTopicHolder;
- }
-
- /**
- * get cacheHolder
- *
- * @return the cacheHolder
- */
- public CacheClusterConfigHolder getCacheHolder() {
- return cacheHolder;
- }
-
- /**
- * get compressType
- *
- * @return the compressType
- */
- public ProxySdk.INLONG_COMPRESSED_TYPE getCompressType() {
- return compressType;
- }
-
- /**
- * get nodeId
- *
- * @return the nodeId
- */
- public String getNodeId() {
- return nodeId;
- }
-
- /**
- * fillInlongId
- *
- * @param currentRecord
- * @param dimensions
- */
- public static void fillInlongId(DispatchProfile currentRecord, Map<String, String> dimensions) {
- String inlongGroupId = currentRecord.getInlongGroupId();
- inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId;
- String inlongStreamId = currentRecord.getInlongStreamId();
- inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : inlongStreamId;
- dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
- dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
- }
-
- /**
- * addSendResultMetric
- *
- * @param currentRecord
- * @param bid
- * @param result
- * @param sendTime
- */
- public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
- // metric
- fillInlongId(currentRecord, dimensions);
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
- long dispatchTime = currentRecord.getDispatchTime();
- long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
- DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
- long count = currentRecord.getCount();
- long size = currentRecord.getSize();
- if (result) {
- metricItem.sendSuccessCount.addAndGet(count);
- metricItem.sendSuccessSize.addAndGet(size);
- currentRecord.getEvents().forEach((event) -> {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- });
- if (sendTime > 0) {
- long currentTime = System.currentTimeMillis();
- currentRecord.getEvents().forEach((event) -> {
- long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - event.getSourceTime();
- long wholeDuration = currentTime - event.getMsgTime();
- metricItem.sinkDuration.addAndGet(sinkDuration);
- metricItem.nodeDuration.addAndGet(nodeDuration);
- metricItem.wholeDuration.addAndGet(wholeDuration);
- });
- }
- } else {
- metricItem.sendFailCount.addAndGet(count);
- metricItem.sendFailSize.addAndGet(size);
- }
- }
-
- /**
- * get dispatchQueue
- *
- * @return the dispatchQueue
- */
- public ArrayList<LinkedBlockingQueue<DispatchProfile>> getDispatchQueues() {
- return dispatchQueues;
- }
-
- public void setDispatchQueues(
- ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
- this.dispatchQueues = dispatchQueues;
- }
-
- /**
- * processSendFail
- * @param currentRecord
- * @param producerTopic
- * @param sendTime
- */
- public void processSendFail(DispatchProfile currentRecord, String producerTopic, long sendTime) {
- if (currentRecord.isResend()) {
- dispatchQueues.get(currentRecord.getSendIndex() % maxThreads).offer(currentRecord);
- this.addSendResultMetric(currentRecord, producerTopic, false, sendTime);
- } else {
- currentRecord.fail();
- }
- }
-
- /**
- * addSendFailMetric
- */
- public void addSendFailMetric() {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
- long msgTime = System.currentTimeMillis();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
- DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
- metricItem.sendFailCount.incrementAndGet();
- }
-
- /**
- * addSendMetric
- *
- * @param currentRecord
- * @param bid
- */
- public void addSendMetric(DispatchProfile currentRecord, String bid) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
- // metric
- fillInlongId(currentRecord, dimensions);
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
- long msgTime = currentRecord.getDispatchTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
- DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
- long count = currentRecord.getCount();
- long size = currentRecord.getSize();
- metricItem.sendCount.addAndGet(count);
- metricItem.sendSize.addAndGet(size);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java
deleted file mode 100644
index 673af1feb..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneClusterProducerCalculator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-
-@FunctionalInterface
-public interface ZoneClusterProducerCalculator {
-
- AbstractZoneClusterProducer calculator(String workerName,
- CacheClusterConfig config, AbstractZoneSinkContext context);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java
deleted file mode 100644
index 6bb300f9d..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/ZoneWorkerCalculator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone;
-
-public interface ZoneWorkerCalculator {
-
- AbstactZoneWorker calculator(String sinkName, int workerIndex, AbstractZoneSinkContext context);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java
deleted file mode 100644
index 9e85028a4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * KafkaClusterProducer
- */
-public class KafkaClusterProducer extends AbstractZoneClusterProducer {
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaClusterProducer.class);
-
- // kafka producer
- private KafkaProducer<String, byte[]> producer;
-
- /**
- * Constructor
- *
- * @param workerName
- * @param config
- * @param context
- */
- public KafkaClusterProducer(String workerName, CacheClusterConfig config, KafkaZoneSinkContext context) {
- super(workerName, config, context);
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- super.state = LifecycleState.START;
- // create kafka producer
- try {
- // prepare configuration
- Properties props = new Properties();
- props.putAll(super.producerContext.getParameters());
- props.putAll(config.getParams());
- LOG.info("try to create kafka client:{}", props);
- producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
- LOG.info("create new producer success:{}", producer);
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- super.state = LifecycleState.STOP;
- // kafka producer
- this.producer.close();
- }
-
- /**
- * send
- *
- * @param event
- */
- @Override
- public boolean send(DispatchProfile event) {
- try {
- // topic
- String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
- if (topic == null) {
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
- return false;
- }
- // create producer failed
- if (producer == null) {
- sinkContext.processSendFail(event, topic, 0);
- return false;
- }
- // headers
- Map<String, String> headers = this.encodeCacheMessageHeaders(event);
- // compress
- byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
- // sendAsync
- long sendTime = System.currentTimeMillis();
-
- // prepare ProducerRecord
- ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, bodyBytes);
- // add headers
- headers.forEach((key, value) -> {
- producerRecord.headers().add(key, value.getBytes());
- });
-
- // callback
- Callback callback = new Callback() {
-
- @Override
- public void onCompletion(RecordMetadata arg0, Exception ex) {
- if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- if (event.isResend()) {
- sinkContext.processSendFail(event, topic, sendTime);
- } else {
- event.fail();
- }
- } else {
- sinkContext.addSendResultMetric(event, topic, true, sendTime);
- event.ack();
- }
- }
- };
- producer.send(producerRecord, callback);
- return true;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- sinkContext.processSendFail(event, event.getUid(), 0);
- return false;
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java
deleted file mode 100644
index 3376c0d9b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneProducer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class KafkaZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
- /**
- * Constructor
- *
- * @param workerName
- * @param context
- */
- public KafkaZoneProducer(String workerName, KafkaZoneSinkContext context) {
- super(workerName, context);
- }
-
- /**
- * reload
- */
- public void reload() {
- super.reload(this);
- }
-
- @Override
- public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
- AbstractZoneSinkContext context) {
- return new KafkaClusterProducer(workerName, config, (KafkaZoneSinkContext) context);
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java
deleted file mode 100644
index c3a71b76f..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * KafkaZoneSink
- */
-public class KafkaZoneSink extends AbstractZoneSink {
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaZoneSink.class);
-
- /**
- * start
- */
- @Override
- public void start() {
- try {
- super.context = new KafkaZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
- super.start((sinkName, workIndex, context) -> new KafkaZoneWorker(sinkName, workIndex,
- (KafkaZoneSinkContext) context));
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- super.start();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java
deleted file mode 100644
index d6545cf78..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- *
- * KafkaZoneSinkContext
- */
-public class KafkaZoneSinkContext extends AbstractZoneSinkContext {
-
- /**
- * Constructor
- *
- * @param context
- */
- public KafkaZoneSinkContext(String sinkName, Context context, Channel channel,
- ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
- super(sinkName, context, channel, dispatchQueues);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java
deleted file mode 100644
index 7f143886c..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaZoneWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * KafkaZoneWorker
- */
-public class KafkaZoneWorker extends AbstactZoneWorker {
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaZoneWorker.class);
-
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
- public KafkaZoneWorker(String sinkName, int workerIndex, KafkaZoneSinkContext context) {
- super(sinkName, workerIndex, context,
- new KafkaZoneProducer(sinkName + "-worker-" + workerIndex, context));
-
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- LOG.info(String.format("start KafkaZoneWorker:%s", this.workerName));
- super.run();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java
deleted file mode 100644
index 2de80ec26..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.security.SecureRandom;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_AUTHENTICATION;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXBYTES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXMESSAGES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BATCHINGMAXPUBLISHDELAY;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_BLOCKIFQUEUEFULL;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_COMPRESSIONTYPE;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_CONNECTIONSPERBROKER;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_ENABLEBATCHING;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_IOTHREADS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MAXPENDINGMESSAGES;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MAXPENDINGMESSAGESACROSSPARTITIONS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_MEMORYLIMIT;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_NAMESPACE;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_SENDTIMEOUT;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_SERVICE_URL;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTERVAL_SECONDS;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_TENANT;
-
-/**
- * PulsarClusterProducer
- */
-public class PulsarClusterProducer extends AbstractZoneClusterProducer {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
-
- private String tenant;
- private String namespace;
-
- /**
- * pulsar client
- */
- private PulsarClient client;
- private ProducerBuilder<byte[]> baseBuilder;
- private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
-
- /**
- * Constructor
- *
- * @param workerName Worker name
- * @param config Cache cluster configuration
- * @param context Sink context
- */
- public PulsarClusterProducer(String workerName, CacheClusterConfig config, PulsarZoneSinkContext context) {
- super(workerName, config, context);
- this.tenant = config.getParams().get(KEY_TENANT);
- this.namespace = config.getParams().get(KEY_NAMESPACE);
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- this.state = LifecycleState.START;
- // create pulsar client
- try {
- String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
- String authentication = config.getParams().get(KEY_AUTHENTICATION);
- this.client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .authentication(AuthenticationFactory.token(authentication))
- .ioThreads(producerContext.getInteger(KEY_IOTHREADS, 1))
- .memoryLimit(producerContext.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
- .connectionsPerBroker(producerContext.getInteger(KEY_CONNECTIONSPERBROKER, 10))
- .statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS), -1),
- TimeUnit.SECONDS)
- .build();
- this.baseBuilder = client.newProducer();
- // Map<String, Object> builderConf = new HashMap<>();
- // builderConf.putAll(context.getParameters());
- this.baseBuilder
- .sendTimeout(producerContext.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
- .maxPendingMessages(producerContext.getInteger(KEY_MAXPENDINGMESSAGES, 500))
- .maxPendingMessagesAcrossPartitions(
- producerContext.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000));
- this.baseBuilder
- .batchingMaxMessages(producerContext.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
- .batchingMaxPublishDelay(producerContext.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
- TimeUnit.MILLISECONDS)
- .batchingMaxBytes(producerContext.getInteger(KEY_BATCHINGMAXBYTES, 131072));
- this.baseBuilder
- .accessMode(ProducerAccessMode.Shared)
- .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
- .blockIfQueueFull(producerContext.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
- this.baseBuilder
- .roundRobinRouterBatchingPartitionSwitchFrequency(
- producerContext.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
- .enableBatching(producerContext.getBoolean(KEY_ENABLEBATCHING, true))
- .compressionType(this.getPulsarCompressionType());
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * getPulsarCompressionType
- *
- * @return CompressionType LZ4/NONE/ZLIB/ZSTD/SNAPPY
- */
- private CompressionType getPulsarCompressionType() {
- String type = this.producerContext.getString(KEY_COMPRESSIONTYPE, CompressionType.SNAPPY.name());
- switch (type) {
- case "LZ4":
- return CompressionType.LZ4;
- case "NONE":
- return CompressionType.NONE;
- case "ZLIB":
- return CompressionType.ZLIB;
- case "ZSTD":
- return CompressionType.ZSTD;
- case "SNAPPY":
- return CompressionType.SNAPPY;
- default:
- return CompressionType.NONE;
- }
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- super.state = LifecycleState.STOP;
- //
- for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- try {
- this.client.close();
- } catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * send DispatchProfile
- *
- * @param event DispatchProfile
- * @return boolean sendResult
- */
- @Override
- public boolean send(DispatchProfile event) {
- try {
- // topic
- String producerTopic = this.getProducerTopic(event);
- if (producerTopic == null) {
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
- event.fail();
- return false;
- }
- // get producer
- Producer<byte[]> producer = this.producerMap.get(producerTopic);
- if (producer == null) {
- try {
- LOG.info("try to new a object for topic " + producerTopic);
- SecureRandom secureRandom = new SecureRandom(
- (workerName + "-" + cacheClusterName + "-" + producerTopic + System.currentTimeMillis())
- .getBytes());
- String producerName = workerName + "-" + cacheClusterName + "-" + producerTopic + "-"
- + secureRandom.nextLong();
- producer = baseBuilder.clone().topic(producerTopic)
- .producerName(producerName)
- .create();
- LOG.info("create new producer success:{}", producer.getProducerName());
- Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
- if (oldProducer != null) {
- producer.close();
- LOG.info("close producer success:{}", producer.getProducerName());
- producer = oldProducer;
- }
- } catch (Throwable ex) {
- LOG.error("create new producer failed", ex);
- }
- }
- // create producer failed
- if (producer == null) {
- sinkContext.processSendFail(event, producerTopic, 0);
- return false;
- }
- // headers
- Map<String, String> headers = this.encodeCacheMessageHeaders(event);
- // compress
- byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
- // sendAsync
- long sendTime = System.currentTimeMillis();
- CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
- .value(bodyBytes).sendAsync();
- // callback
- future.whenCompleteAsync((msgId, ex) -> {
- if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- sinkContext.processSendFail(event, producerTopic, sendTime);
- } else {
- sinkContext.addSendResultMetric(event, producerTopic, true, sendTime);
- event.ack();
- }
- });
- return true;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- sinkContext.processSendFail(event, event.getUid(), 0);
- return false;
- }
- }
-
- /**
- * getProducerTopic
- *
- * @param event DispatchProfile
- * @return String Full topic name
- */
- private String getProducerTopic(DispatchProfile event) {
- String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
- if (baseTopic == null) {
- return null;
- }
- StringBuilder builder = new StringBuilder();
- if (tenant != null) {
- builder.append(tenant).append("/");
- }
- if (namespace != null) {
- builder.append(namespace).append("/");
- }
- builder.append(baseTopic);
- return builder.toString();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java
deleted file mode 100644
index b2fb7819e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneProducer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class PulsarZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
- public PulsarZoneProducer(String workerName, AbstractZoneSinkContext context) {
- super(workerName, context);
- }
-
- /**
- * reload
- */
- public void reload() {
- super.reload(this);
- }
-
- @Override
- public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
- AbstractZoneSinkContext context) {
- return new PulsarClusterProducer(workerName, config, (PulsarZoneSinkContext) context);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java
deleted file mode 100644
index 5052d0c47..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PulsarZoneSink
- */
-public class PulsarZoneSink extends AbstractZoneSink {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneSink.class);
-
- /**
- * start
- */
- @Override
- public void start() {
- try {
- super.context = new PulsarZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
- super.start((sinkName, workIndex, context) -> new PulsarZoneWorker(sinkName, workIndex,
- (PulsarZoneSinkContext) context));
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- super.start();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java
deleted file mode 100644
index 97db6dcac..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- *
- * PulsarZoneSinkContext
- */
-public class PulsarZoneSinkContext extends AbstractZoneSinkContext {
-
- /**
- * Constructor
- *
- * @param context
- */
- public PulsarZoneSinkContext(String sinkName, Context context, Channel channel,
- ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
- super(sinkName, context, channel, dispatchQueues);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java
deleted file mode 100644
index 8332b3b50..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarZoneWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PulsarZoneWorker
- */
-public class PulsarZoneWorker extends AbstactZoneWorker {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarZoneWorker.class);
-
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
- public PulsarZoneWorker(String sinkName, int workerIndex, PulsarZoneSinkContext context) {
- super(sinkName, workerIndex, context,
- new PulsarZoneProducer(sinkName + "-worker-" + workerIndex, context));
-
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- LOG.info(String.format("start PulsarZoneWorker:%s", super.workerName));
- super.run();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java
deleted file mode 100644
index 5ab814ff4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeClusterProducer.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.flume.Context;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.sdk.commons.protocol.EventUtils;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * TubeClusterProducer
- */
-public class TubeClusterProducer extends AbstractZoneClusterProducer {
-
- public static final Logger LOG = LoggerFactory.getLogger(TubeClusterProducer.class);
- private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
-
- // parameter
- private String masterHostAndPortList;
- private long linkMaxAllowedDelayedMsgCount;
- private long sessionWarnDelayedMsgCount;
- private long sessionMaxAllowedDelayedMsgCount;
- private long nettyWriteBufferHighWaterMark;
- // tube producer
- private TubeMultiSessionFactory sessionFactory;
- private MessageProducer producer;
- private Set<String> topicSet = new HashSet<>();
-
- /**
- * Constructor
- *
- * @param workerName
- * @param config
- * @param context
- */
- public TubeClusterProducer(String workerName, CacheClusterConfig config, TubeZoneSinkContext context) {
- super(workerName, config, context);
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- super.state = LifecycleState.START;
- // create tube producer
- try {
- // prepare configuration
- TubeClientConfig conf = initTubeConfig();
- LOG.info("try to create producer:{}", conf.toJsonString());
- this.sessionFactory = new TubeMultiSessionFactory(conf);
- this.producer = sessionFactory.createProducer();
- LOG.info("create new producer success:{}", producer);
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * initTubeConfig
- * @return
- *
- * @throws Exception
- */
- private TubeClientConfig initTubeConfig() throws Exception {
- // get parameter
- Context configContext = new Context(this.producerContext.getParameters());
- configContext.putAll(this.config.getParams());
- masterHostAndPortList = configContext.getString(MASTER_HOST_PORT_LIST);
- linkMaxAllowedDelayedMsgCount = configContext.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
- 80000L);
- sessionWarnDelayedMsgCount = configContext.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
- 2000000L);
- sessionMaxAllowedDelayedMsgCount = configContext.getLong(
- ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
- 4000000L);
- nettyWriteBufferHighWaterMark = configContext.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
- 15 * 1024 * 1024L);
- // config
- final TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
- tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
- tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
- tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
- tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
- tubeClientConfig.setHeartbeatPeriodMs(15000L);
- tubeClientConfig.setRpcTimeoutMs(20000L);
-
- return tubeClientConfig;
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- super.state = LifecycleState.STOP;
- // producer
- if (this.producer != null) {
- try {
- this.producer.shutdown();
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
- if (this.sessionFactory != null) {
- try {
- this.sessionFactory.shutdown();
- } catch (TubeClientException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
-
- /**
- * send
- *
- * @param event
- */
- @Override
- public boolean send(DispatchProfile event) {
- try {
- // topic
- String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
- if (topic == null) {
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
- return false;
- }
- // publish
- if (!this.topicSet.contains(topic)) {
- this.producer.publish(topic);
- this.topicSet.add(topic);
- }
- // create producer failed
- if (producer == null) {
- sinkContext.processSendFail(event, topic, 0);
- return false;
- }
- // headers
- Map<String, String> headers = this.encodeCacheMessageHeaders(event);
- // compress
- byte[] bodyBytes = EventUtils.encodeCacheMessageBody(sinkContext.getCompressType(), event.getEvents());
- // sendAsync
- Message message = new Message(topic, bodyBytes);
- // add headers
- headers.forEach((key, value) -> {
- message.setAttrKeyVal(key, value);
- });
- // callback
- long sendTime = System.currentTimeMillis();
- MessageSentCallback callback = new MessageSentCallback() {
-
- @Override
- public void onMessageSent(MessageSentResult result) {
- sinkContext.addSendResultMetric(event, topic, true, sendTime);
- event.ack();
- }
-
- @Override
- public void onException(Throwable ex) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- sinkContext.processSendFail(event, topic, sendTime);
- }
- };
- producer.sendMessage(message, callback);
- return true;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- sinkContext.processSendFail(event, event.getUid(), 0);
- return false;
- }
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java
deleted file mode 100644
index 214bb84c0..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneProducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneProducer;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-import org.apache.inlong.dataproxy.sink.mqzone.ZoneClusterProducerCalculator;
-
-public class TubeZoneProducer extends AbstractZoneProducer implements ZoneClusterProducerCalculator {
-
- /**
- * Constructor
- *
- * @param workerName
- * @param context
- */
-
- public TubeZoneProducer(String workerName, TubeZoneSinkContext context) {
- super(workerName, context);
- }
-
- /**
- * reload
- */
- public void reload() {
- super.reload(this);
- }
-
- @Override
- public AbstractZoneClusterProducer calculator(String workerName, CacheClusterConfig config,
- AbstractZoneSinkContext context) {
- return new TubeClusterProducer(workerName, config, (TubeZoneSinkContext) context);
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java
deleted file mode 100644
index 764db484e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSink.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * TubeZoneSink
- */
-public class TubeZoneSink extends AbstractZoneSink {
-
- public static final Logger LOG = LoggerFactory.getLogger(TubeZoneSink.class);
-
- /**
- * start
- */
- @Override
- public void start() {
- try {
- super.context = new TubeZoneSinkContext(getName(), parentContext, getChannel(), super.dispatchQueues);
- super.start((sinkName, workIndex, context) -> {
- return new TubeZoneWorker(sinkName, workIndex, (TubeZoneSinkContext) context);
- });
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- super.start();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java
deleted file mode 100644
index 68946cc40..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneSinkContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import java.util.ArrayList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
-import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- *
- * TubeZoneSinkContext
- */
-public class TubeZoneSinkContext extends AbstractZoneSinkContext {
-
- /**
- * Constructor
- *
- * @param context
- */
- public TubeZoneSinkContext(String sinkName, Context context, Channel channel,
- ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues) {
- super(sinkName, context, channel, dispatchQueues);
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java
deleted file mode 100644
index 70d1eede9..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mqzone/impl/tubezone/TubeZoneWorker.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.mqzone.impl.tubezone;
-
-import org.apache.inlong.dataproxy.sink.mqzone.AbstactZoneWorker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * TubeZoneWorker
- */
-public class TubeZoneWorker extends AbstactZoneWorker {
-
- public static final Logger LOG = LoggerFactory.getLogger(TubeZoneWorker.class);
-
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
- public TubeZoneWorker(String sinkName, int workerIndex, TubeZoneSinkContext context) {
- super(sinkName, workerIndex, context,
- new TubeZoneProducer(sinkName + "-worker-" + workerIndex, context));
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- LOG.info(String.format("start TubeZoneWorker:%s", super.workerName));
- super.run();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java
deleted file mode 100644
index d2c6c67c6..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/CreatePulsarClientCallBack.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-public interface CreatePulsarClientCallBack {
-
- void handleCreateClientSuccess(String url);
-
- void handleCreateClientException(String url);
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
deleted file mode 100644
index d9912273b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.sink.PulsarSink;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class PulsarClientService {
-
- private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
- private MQClusterConfig pulsarConfig;
- public Map<String, List<TopicProducerInfo>> producerInfoMap;
- public Map<String, AtomicLong> topicSendIndexMap;
- public Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
- public int pulsarClientIoThreads;
- public int pulsarConnectionsPreBroker;
- /*
- * for pulsar client
- */
- private Map<String, String> pulsarUrl2token;
- private String authType;
- /*
- * for producer
- */
- /*
- * unit mills
- */
- private Integer sendTimeout;
- private Integer clientTimeout;
- private boolean enableBatch = true;
- private boolean blockIfQueueFull = true;
- private int maxPendingMessages = 10000;
- private int maxPendingMessagesAcrossPartitions = 500000;
- private CompressionType compressionType;
- private int maxBatchingBytes = 128 * 1024;
- private int maxBatchingMessages = 1000;
- private long maxBatchingPublishDelayMillis = 1;
- private long retryIntervalWhenSendMsgError = 30 * 1000L;
- private int sinkThreadPoolSize;
-
- /**
- * PulsarClientService
- *
- * @param pulsarConfig
- */
- public PulsarClientService(MQClusterConfig pulsarConfig, int sinkThreadPoolSize) {
- this.pulsarConfig = pulsarConfig;
- this.sinkThreadPoolSize = sinkThreadPoolSize;
-
- authType = pulsarConfig.getAuthType();
- sendTimeout = pulsarConfig.getSendTimeoutMs();
- retryIntervalWhenSendMsgError = pulsarConfig.getRetryIntervalWhenSendErrorMs();
- clientTimeout = pulsarConfig.getClientTimeoutSecond();
-
- Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
-
- pulsarClientIoThreads = pulsarConfig.getPulsarClientIoThreads();
- pulsarConnectionsPreBroker = pulsarConfig.getPulsarConnectionsPreBroker();
-
- enableBatch = pulsarConfig.getEnableBatch();
- blockIfQueueFull = pulsarConfig.getBlockIfQueueFull();
- maxPendingMessages = pulsarConfig.getMaxPendingMessages();
- maxPendingMessagesAcrossPartitions = pulsarConfig.getMaxPendingMessagesAcrossPartitions();
- String compressionTypeStr = pulsarConfig.getCompressionType();
- if (StringUtils.isNotEmpty(compressionTypeStr)) {
- compressionType = CompressionType.valueOf(compressionTypeStr);
- } else {
- compressionType = CompressionType.NONE;
- }
- maxBatchingMessages = pulsarConfig.getMaxBatchingMessages();
- maxBatchingBytes = pulsarConfig.getMaxBatchingBytes();
- maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
- producerInfoMap = new ConcurrentHashMap<>();
- topicSendIndexMap = new ConcurrentHashMap<>();
- }
-
- public void initCreateConnection(CreatePulsarClientCallBack callBack, String sinkName) {
- pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
- if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
- logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
- return;
- }
- try {
- createConnection(callBack);
- if (!ConfigManager.getInstance().isMqClusterReady()) {
- ConfigManager.getInstance().updMqClusterStatus(true);
- logger.info("[{}] MQ Cluster service status ready!", sinkName);
- }
- } catch (FlumeException e) {
- logger.error("unable to create pulsar client: ", e);
- close();
- }
- }
-
- /**
- * send message
- */
- public boolean sendMessage(int poolIndex, String topic,
- EventStat es, PulsarSink pulsarSink) {
- boolean result;
- TopicProducerInfo producerInfo = null;
- Event event = es.getEvent();
- final String pkgVersion =
- event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
- final String inlongGroupId =
- event.getHeaders().get(AttributeConstants.GROUP_ID);
- final String inlongStreamId =
- event.getHeaders().get(AttributeConstants.STREAM_ID);
- String errMsg = "";
- try {
- producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
- } catch (Exception e) {
- errMsg = "Get producer failed for topic=" + topic + ", reason is " + e.getMessage();
- }
- /*
- * If the producer is a null value,\ it means that the topic is not yet ready, and it needs to be played back
- * into the file channel
- */
- if (producerInfo == null) {
- /*
- * Data within 30s is placed in the exception channel to prevent frequent checks After 30s, reopen the topic
- * check, if it is still a null value, put it back into the illegal map
- */
- pulsarSink.handleRequestProcError(topic, es,
- false, DataProxyErrCode.NO_AVAILABLE_PRODUCER, errMsg);
- return false;
- }
- TopicProducerInfo forCallBackP = producerInfo;
- Producer producer = producerInfo.getProducer(poolIndex);
- if (producer == null) {
- errMsg = "get producer is null! topic = " + topic;
- pulsarSink.handleRequestProcError(topic, es,
- false, DataProxyErrCode.PRODUCER_IS_NULL, errMsg);
- return false;
- }
- // build and send message
- Map<String, String> proMap =
- MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
- long startTime = System.currentTimeMillis();
- pulsarSink.getCurrentInFlightCount().incrementAndGet();
- if (es.isOrderMessage()) {
- try {
- String partitionKey =
- event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
- MessageId msgId = producer.newMessage()
- .properties(proMap)
- .key(partitionKey)
- .value(event.getBody())
- .send();
- pulsarSink.handleMessageSendSuccess(topic, msgId, es, startTime);
- forCallBackP.setCanUseSend(true);
- result = true;
- } catch (Throwable ex) {
- forCallBackP.setCanUseSend(false);
- pulsarSink.handleMessageSendException(topic, es, ex,
- DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
- result = ex instanceof NotFoundException;
- }
- } else {
- try {
- producer.newMessage().properties(proMap)
- .value(event.getBody())
- .sendAsync()
- .thenAccept((msgId) -> {
- forCallBackP.setCanUseSend(true);
- pulsarSink.handleMessageSendSuccess(topic, msgId, es, startTime);
- })
- .exceptionally((e) -> {
- forCallBackP.setCanUseSend(false);
- pulsarSink.handleMessageSendException(topic, es, e,
- DataProxyErrCode.MQ_RETURN_ERROR, e.toString());
- return null;
- });
- } catch (Throwable ex) {
- pulsarSink.getCurrentInFlightCount().decrementAndGet();
- pulsarSink.handleRequestProcError(topic, es, true,
- DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, ex.getMessage());
- }
- result = true;
- }
- return result;
- }
-
- /**
- * If this function is called successively without calling {@see #destroyConnection()}, only the
- * first call has any effect.
- *
- * @throws FlumeException if an RPC client connection could not be opened
- */
- private void createConnection(CreatePulsarClientCallBack callBack) throws FlumeException {
- if (!pulsarClients.isEmpty()) {
- return;
- }
- logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
- for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("url = {}, token = {}", info.getKey(), info.getValue());
- }
- PulsarClient client = initPulsarClient(info.getKey(), info.getValue());
- pulsarClients.put(info.getKey(), client);
- callBack.handleCreateClientSuccess(info.getKey());
- } catch (PulsarClientException e) {
- callBack.handleCreateClientException(info.getKey());
- logger.error("create connection error in Pulsar sink, "
- + "maybe pulsar master set error, please re-check. url " + info.getKey(), e);
- } catch (Throwable e) {
- callBack.handleCreateClientException(info.getKey());
- logger.error("create connection error in pulsar sink, "
- + "maybe pulsar master set error/shutdown in progress, please "
- + "re-check. url " + info.getKey(), e);
- }
- }
- if (pulsarClients.isEmpty()) {
- throw new FlumeException("connect to pulsar error, maybe zkstr/zkroot set error, please re-check");
- }
- }
-
- private PulsarClient initPulsarClient(String pulsarUrl, String token) throws Exception {
- ClientBuilder builder = PulsarClient.builder();
- if (MQClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
- builder.authentication(AuthenticationFactory.token(token));
- }
- builder.serviceUrl(pulsarUrl)
- .ioThreads(pulsarClientIoThreads)
- .connectionsPerBroker(pulsarConnectionsPreBroker)
- .connectionTimeout(clientTimeout, TimeUnit.SECONDS)
- .statsInterval(pulsarConfig.getStatIntervalSec(),
- TimeUnit.SECONDS);
- return builder.build();
- }
-
- /**
- * Producer initialization.
- */
- public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
- String inlongStreamId) {
- List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
- List<TopicProducerInfo> newList = new ArrayList<>();
- for (PulsarClient pulsarClient : pulsarClients.values()) {
- TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
- sinkThreadPoolSize, topic);
- info.initProducer(inlongGroupId, inlongStreamId);
- if (info.isCanUseToSendMessage()) {
- newList.add(info);
- }
- }
- if (newList.size() == 0) {
- newList = null;
- }
- return newList;
- });
- return producerInfoList;
- }
-
- public List<TopicProducerInfo> initTopicProducer(String topic) {
- return initTopicProducer(topic, null, null);
- }
-
- public boolean destroyProducerByTopic(String topic) {
- List<TopicProducerInfo> producerInfoList = producerInfoMap.remove(topic);
- if (producerInfoList == null || producerInfoList.isEmpty()) {
- return true;
- }
- for (TopicProducerInfo producerInfo : producerInfoList) {
- if (producerInfo != null) {
- producerInfo.close();
- logger.info("destroy producer for topic={}", topic);
- }
- }
- return true;
- }
-
- private TopicProducerInfo getProducerInfo(int poolIndex, String topic, String inlongGroupId,
- String inlongStreamId) {
- List<TopicProducerInfo> producerList = initTopicProducer(topic, inlongGroupId, inlongStreamId);
- AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> new AtomicLong(0));
- int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
- if (maxTryToGetProducer == 0) {
- return null;
- }
- int retryTime = 0;
- TopicProducerInfo p;
- do {
- int index = (int) (topicIndex.getAndIncrement() % maxTryToGetProducer);
- p = producerList.get(index);
- if (p.isCanUseToSendMessage() && p.getProducer(poolIndex) != null
- && p.getProducer(poolIndex).isConnected()) {
- break;
- }
- retryTime++;
- } while (retryTime < maxTryToGetProducer);
- return p;
- }
-
- public Map<String, List<TopicProducerInfo>> getProducerInfoMap() {
- return producerInfoMap;
- }
-
- private void destroyConnection() {
- producerInfoMap.clear();
- for (PulsarClient pulsarClient : pulsarClients.values()) {
- try {
- pulsarClient.shutdown();
- } catch (Exception e) {
- logger.error("destroy pulsarClient error in PulsarSink: ", e);
- }
- }
- pulsarClients.clear();
- logger.debug("closed meta producer");
- }
-
- private void removeProducers(PulsarClient pulsarClient) {
- for (List<TopicProducerInfo> producers : producerInfoMap.values()) {
- if (producers == null || producers.isEmpty()) {
- continue;
- }
- Iterator<TopicProducerInfo> it = producers.iterator();
- while (it.hasNext()) {
- TopicProducerInfo entry = it.next();
- if (entry.getPulsarClient().equals(pulsarClient)) {
- entry.close();
- it.remove();
- }
- }
- }
- }
-
- /**
- * close pulsarClients(the related url is removed); start pulsarClients for new url, and create producers for them
- *
- * @param callBack callback
- * @param needToClose url-token map
- * @param needToStart url-token map
- * @param topicSet for new pulsarClient, create these topics' producers
- */
- public void updatePulsarClients(CreatePulsarClientCallBack callBack, Map<String, String> needToClose,
- Map<String, String> needToStart, Set<String> topicSet) {
- // close
- for (String url : needToClose.keySet()) {
- PulsarClient pulsarClient = pulsarClients.get(url);
- if (pulsarClient != null) {
- try {
- removeProducers(pulsarClient);
- pulsarClient.shutdown();
- pulsarClients.remove(url);
- } catch (Exception e) {
- logger.error("shutdown pulsarClient error in PulsarSink: ", e);
- }
- }
- }
- for (Map.Entry<String, String> entry : needToStart.entrySet()) {
- String url = entry.getKey();
- String token = entry.getValue();
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("url = {}, token = {}", url, token);
- }
- PulsarClient client = initPulsarClient(url, token);
- pulsarClients.put(url, client);
- callBack.handleCreateClientSuccess(url);
-
- // create related topicProducers
- for (String topic : topicSet) {
- TopicProducerInfo info = new TopicProducerInfo(client, sinkThreadPoolSize,
- topic);
- info.initProducer();
- if (info.isCanUseToSendMessage()) {
- List<TopicProducerInfo> producerInfos = producerInfoMap.get(topic);
- if (producerInfos == null) {
- List<TopicProducerInfo> tmpProdInfos = new ArrayList<>();
- producerInfos = producerInfoMap.putIfAbsent(topic, tmpProdInfos);
- if (producerInfos == null) {
- producerInfos = tmpProdInfos;
- }
- }
- producerInfos.add(info);
- }
- }
- } catch (PulsarClientException e) {
- callBack.handleCreateClientException(url);
- logger.error("create connection error in pulsar sink, "
- + "maybe pulsar master set error, please re-check.url " + url, e);
- } catch (Throwable e) {
- callBack.handleCreateClientException(url);
- logger.error("create connection error in pulsar sink, "
- + "maybe pulsar master set error/shutdown in progress, please "
- + "re-check. url " + url, e);
- }
- }
- }
-
- /**
- * get inlong stream id from event
- *
- * @param event event
- * @return inlong stream id
- */
- private String getInlongStreamId(Event event) {
- String streamId = "";
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
- return streamId;
- }
-
- /**
- * get inlong group id from event
- *
- * @param event event
- * @return inlong group id
- */
- private String getInlongGroupId(Event event) {
- return event.getHeaders().get(AttributeConstants.GROUP_ID);
- }
-
- public void close() {
- destroyConnection();
- }
-
- class TopicProducerInfo {
-
- private final Producer[] producers;
- private final PulsarClient pulsarClient;
- private final int sinkThreadPoolSize;
- private final String topic;
- private long lastSendMsgErrorTime;
- private volatile Boolean isCanUseSend = true;
-
- private volatile Boolean isFinishInit = false;
-
- public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize, String topic) {
- this.pulsarClient = pulsarClient;
- this.sinkThreadPoolSize = sinkThreadPoolSize;
- this.topic = topic;
- this.producers = new Producer[sinkThreadPoolSize];
- }
-
- public void initProducer() {
- initProducer(null, null);
- }
-
- public void initProducer(String inlongGroupId, String inlongStreamId) {
- try {
- for (int i = 0; i < sinkThreadPoolSize; i++) {
- producers[i] = createProducer();
- }
- isFinishInit = true;
- } catch (PulsarClientException e) {
- logger.error("create pulsar client has error, topic = {}, inlongGroupId = {}, inlongStreamId= {}",
- topic, inlongGroupId, inlongStreamId, e);
- isFinishInit = false;
- for (int i = 0; i < sinkThreadPoolSize; i++) {
- if (producers[i] != null) {
- producers[i].closeAsync();
- }
- }
- }
- }
-
- private Producer createProducer() throws PulsarClientException {
- return pulsarClient.newProducer().sendTimeout(sendTimeout, TimeUnit.MILLISECONDS)
- .topic(topic)
- .enableBatching(enableBatch)
- .blockIfQueueFull(blockIfQueueFull)
- .maxPendingMessages(maxPendingMessages)
- .maxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions)
- .compressionType(compressionType)
- .batchingMaxMessages(maxBatchingMessages)
- .batchingMaxBytes(maxBatchingBytes)
- .batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
- .create();
- }
-
- public void setCanUseSend(Boolean isCanUseSend) {
- this.isCanUseSend = isCanUseSend;
- if (!isCanUseSend) {
- lastSendMsgErrorTime = System.currentTimeMillis();
- }
- }
-
- public boolean isCanUseToSendMessage() {
- if (isCanUseSend && isFinishInit) {
- return true;
- } else if (isFinishInit
- && (System.currentTimeMillis() - lastSendMsgErrorTime) > retryIntervalWhenSendMsgError) {
- lastSendMsgErrorTime = System.currentTimeMillis();
- return true;
- }
- return false;
- }
-
- public void close() {
- try {
- for (int i = 0; i < sinkThreadPoolSize; i++) {
- if (producers[i] != null) {
- producers[i].close();
- }
- }
- } catch (PulsarClientException e) {
- logger.error("close pulsar producer has error: ", e);
- }
- }
-
- public Producer getProducer(int poolIndex) {
- if (poolIndex >= sinkThreadPoolSize || producers[poolIndex] == null) {
- return producers[0];
- }
- return producers[poolIndex];
- }
-
- public PulsarClient getPulsarClient() {
- return pulsarClient;
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
deleted file mode 100644
index bd1d9cd29..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.dataproxy.sink.EventStat;
-
-public interface SendMessageCallBack {
-
- void handleMessageSendSuccess(String topic, Object msgId, EventStat es, long startTime);
-
- void handleRequestProcError(String topic, EventStat es,
- boolean needRetry, DataProxyErrCode errCode, String errMsg);
-
- void handleMessageSendException(String topic, EventStat es, Object exception,
- DataProxyErrCode errCode, String errMsg);
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
deleted file mode 100644
index c97e45c70..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar;
-
-import com.google.common.cache.LoadingCache;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Event;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.sink.PulsarSink;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SinkTask extends Thread {
-
- private static final Logger logger = LoggerFactory.getLogger(SinkTask.class);
-
- private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000);
-
- /*
- * default value
- */
- private static int BATCH_SIZE = 10000;
-
- private PulsarClientService pulsarClientService;
-
- private PulsarSink pulsarSink;
-
- private long logCounter = 0;
-
- private int poolIndex = 0;
-
- private LinkedBlockingQueue<EventStat> eventQueue;
-
- private LinkedBlockingQueue<EventStat> resendQueue;
-
- private AtomicLong currentInFlightCount;
-
- private SinkCounter sinkCounter;
-
- private LoadingCache<String, Long> agentIdCache;
-
- private MQClusterConfig pulsarConfig;
-
- private int maxRetrySendCnt;
- /*
- * whether the SendTask thread can send data to pulsar
- */
- private volatile boolean canSend = false;
-
- public SinkTask(PulsarClientService pulsarClientService, PulsarSink pulsarSink,
- int eventQueueSize,
- int badEventQueueSize, int poolIndex, boolean canSend) {
- this.pulsarClientService = pulsarClientService;
- this.pulsarSink = pulsarSink;
- this.poolIndex = poolIndex;
- this.canSend = canSend;
- this.currentInFlightCount = pulsarSink.getCurrentInFlightCount();
- this.sinkCounter = pulsarSink.getSinkCounter();
- this.agentIdCache = pulsarSink.getAgentIdCache();
- this.pulsarConfig = pulsarSink.getPulsarConfig();
- this.maxRetrySendCnt = pulsarSink.getMaxRetrySendCnt();
- eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
- resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
- }
-
- public boolean processEvent(EventStat eventStat) {
- try {
- return eventQueue.offer(eventStat, 3 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("InterruptedException e", e);
- }
- return false;
- }
-
- public boolean processReSendEvent(EventStat eventStat) {
- return resendQueue.offer(eventStat);
- }
-
- public boolean isAllSendFinished() {
- return eventQueue.size() == 0;
- }
-
- public void close() {
- canSend = false;
- }
-
- @Override
- public void run() {
- logger.info("Sink task {} started.", Thread.currentThread().getName());
- while (canSend) {
- Event event = null;
- EventStat eventStat = null;
- String topic = null;
- try {
- if (!resendQueue.isEmpty()) {
- /*
- * Send the data in the retry queue first
- */
- eventStat = resendQueue.poll();
- if (eventStat != null) {
- event = eventStat.getEvent();
- }
- } else {
- if (currentInFlightCount.get() > BATCH_SIZE) {
- /*
- * Under the condition that the number of unresponsive messages is greater than 1w, the number
- * of unresponsive messages sent to pulsar will be printed periodically
- */
- logCounter++;
- if (logCounter == 1 || logCounter % 100000 == 0) {
- logger.info(getName()
- + " currentInFlightCount={} resendQueue"
- + ".size={}",
- currentInFlightCount.get(), resendQueue.size());
- }
- if (logCounter > Long.MAX_VALUE - 10) {
- logCounter = 0;
- }
- }
- eventStat = eventQueue.take();
- sinkCounter.incrementEventDrainAttemptCount();
- event = eventStat.getEvent();
- }
- // check event status
- if (event == null) {
- logger.warn("Event is null!");
- continue;
- }
- // check whether discard or send event
- if (eventStat.getRetryCnt() > maxRetrySendCnt) {
- logger.warn("Message will be discard! send times reach to max retry cnt."
- + " max retry cnt = {}", maxRetrySendCnt);
- continue;
- }
- // get topic
- topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
- if (StringUtils.isEmpty(topic)) {
- String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID);
- String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId);
- }
- if (topic == null || topic.equals("")) {
- pulsarSink.handleRequestProcError(topic, eventStat,
- false, DataProxyErrCode.TOPIC_IS_BLANK, "");
- continue;
- }
- // check whether duplicated event
- String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
- if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
- boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId);
- agentIdCache.put(clientSeqId, System.currentTimeMillis());
- if (hasSend) {
- pulsarSink.handleRequestProcError(topic, eventStat,
- false, DataProxyErrCode.DUPLICATED_MESSAGE,
- "Duplicated message by uuid = " + clientSeqId);
- if (logPrinterA.shouldPrint()) {
- logger.info("{} agent package {} existed,just discard.",
- getName(), clientSeqId);
- }
- continue;
- }
- }
- // send message
- pulsarClientService.sendMessage(poolIndex, topic, eventStat, pulsarSink);
- } catch (InterruptedException e) {
- logger.error("Thread {} has been interrupted!",
- Thread.currentThread().getName());
- return;
- } catch (Throwable t) {
- if (t instanceof PulsarClientException) {
- String message = t.getMessage();
- if (message != null && (message.contains("No available queue for topic")
- || message.contains("The brokers of topic are all forbidden"))) {
- logger.info("IllegalTopicMap.put " + topic);
- continue;
- } else {
- try {
- /*
- * The exception of pulsar will cause the sending thread to block and prevent further
- * pressure on pulsar. Here you should pay attention to the type of exception to prevent the
- * error of a topic from affecting the global
- */
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore..
- }
- }
- }
- if (logPrinterA.shouldPrint()) {
- logger.error("Sink task fail to send the message, sink.name="
- + Thread.currentThread().getName()
- + ",event.headers="
- + eventStat.getEvent().getHeaders(), t);
- }
- /*
- * producer.sendMessage is abnormal, so currentInFlightCount is not added, so there is no need to
- * subtract
- */
- pulsarSink.handleRequestProcError(topic, eventStat, false,
- DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, t.getMessage());
- }
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
deleted file mode 100644
index 317da28f5..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * PulsarSetSink
- */
-public class PulsarFederationSink extends AbstractSink implements Configurable {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
-
- private PulsarFederationSinkContext context;
- private List<PulsarFederationWorker> workers = new ArrayList<>();
- private Map<String, String> dimensions;
-
- /**
- * start
- */
- @Override
- public void start() {
- String sinkName = this.getName();
- // create worker
- for (int i = 0; i < context.getMaxThreads(); i++) {
- PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
- worker.start();
- this.workers.add(worker);
- }
- super.start();
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- for (PulsarFederationWorker worker : workers) {
- try {
- worker.close();
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
- this.context.close();
- super.stop();
- }
-
- /**
- * configure
- *
- * @param context
- */
- @Override
- public void configure(Context context) {
- LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
- this.context = new PulsarFederationSinkContext(this.getName(), context);
- this.dimensions = new HashMap<>();
- this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- }
-
- /**
- * process
- *
- * @return Status
- * @throws EventDeliveryException
- */
- @Override
- public Status process() throws EventDeliveryException {
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event == null) {
- tx.commit();
- return Status.BACKOFF;
- }
- //
- int eventSize = event.getBody().length;
- if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
- // record the failure of queue full for monitor
- // metric
- DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(eventSize);
- //
- tx.rollback();
- return Status.BACKOFF;
- }
- this.context.getBufferQueue().offer(event);
- tx.commit();
- return Status.READY;
- } catch (Throwable t) {
- LOG.error("Process event failed!" + this.getName(), t);
- try {
- tx.rollback();
- // metric
- DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
- } catch (Throwable e) {
- LOG.error("Channel take transaction rollback exception:" + getName(), e);
- }
- return Status.BACKOFF;
- } finally {
- tx.close();
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
deleted file mode 100644
index f57e4bbe0..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.Map;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.utils.BufferQueue;
-
-/**
- *
- * PulsarFederationContext
- */
-public class PulsarFederationSinkContext {
-
- public static final String KEY_MAX_THREADS = "max-threads";
- public static final String KEY_MAXTRANSACTION = "maxTransaction";
- public static final String KEY_PROCESSINTERVAL = "processInterval";
- public static final String KEY_RELOADINTERVAL = "reloadInterval";
- public static final String KEY_MAXBUFFERQUEUESIZE = "maxBufferQueueSize";
- public static final String PREFIX_PRODUCER = "producer.";
-
- private final String proxyClusterId;
- private final Context sinkContext;
- private final Context producerContext;
- //
- private final IdTopicConfigHolder idTopicHolder;
- private final CacheClusterConfigHolder cacheHolder;
- private final BufferQueue<Event> bufferQueue;
- //
- private final int maxThreads;
- private final int maxTransaction;
- private final long processInterval;
- private final long reloadInterval;
- //
- private final DataProxyMetricItemSet metricItemSet;
-
- /**
- * Constructor
- *
- * @param context
- */
- public PulsarFederationSinkContext(String sinkName, Context context) {
- this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
- this.sinkContext = context;
- this.maxThreads = context.getInteger(KEY_MAX_THREADS, 10);
- this.maxTransaction = context.getInteger(KEY_MAXTRANSACTION, 1);
- this.processInterval = context.getInteger(KEY_PROCESSINTERVAL, 100);
- this.reloadInterval = context.getLong(KEY_RELOADINTERVAL, 60000L);
- //
- this.idTopicHolder = new IdTopicConfigHolder();
- this.idTopicHolder.configure(context);
- this.idTopicHolder.start();
- //
- this.cacheHolder = new CacheClusterConfigHolder();
- this.cacheHolder.configure(context);
- this.cacheHolder.start();
- //
- int maxBufferQueueSize = context.getInteger(KEY_MAXBUFFERQUEUESIZE, 128 * 1024);
- this.bufferQueue = new BufferQueue<Event>(maxBufferQueueSize);
- //
- Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
- this.producerContext = new Context(producerParams);
- //
- this.metricItemSet = new DataProxyMetricItemSet(sinkName);
- MetricRegister.register(this.metricItemSet);
- }
-
- /**
- * close
- */
- public void close() {
- this.idTopicHolder.close();
- this.cacheHolder.close();
- }
-
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
- /**
- * get sinkContext
- *
- * @return the sinkContext
- */
- public Context getSinkContext() {
- return sinkContext;
- }
-
- /**
- * get producerContext
- *
- * @return the producerContext
- */
- public Context getProducerContext() {
- return producerContext;
- }
-
- /**
- * get idTopicHolder
- *
- * @return the idTopicHolder
- */
- public IdTopicConfigHolder getIdTopicHolder() {
- return idTopicHolder;
- }
-
- /**
- * get cacheHolder
- *
- * @return the cacheHolder
- */
- public CacheClusterConfigHolder getCacheHolder() {
- return cacheHolder;
- }
-
- /**
- * get bufferQueue
- *
- * @return the bufferQueue
- */
- public BufferQueue<Event> getBufferQueue() {
- return bufferQueue;
- }
-
- /**
- * get maxThreads
- *
- * @return the maxThreads
- */
- public int getMaxThreads() {
- return maxThreads;
- }
-
- /**
- * get maxTransaction
- *
- * @return the maxTransaction
- */
- public int getMaxTransaction() {
- return maxTransaction;
- }
-
- /**
- * get processInterval
- *
- * @return the processInterval
- */
- public long getProcessInterval() {
- return processInterval;
- }
-
- /**
- * get reloadInterval
- *
- * @return the reloadInterval
- */
- public long getReloadInterval() {
- return reloadInterval;
- }
-
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
- public DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
deleted file mode 100644
index b8070176a..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Event;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * PulsarSetWorker
- */
-public class PulsarFederationWorker extends Thread {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationWorker.class);
-
- private final String workerName;
- private final PulsarFederationSinkContext context;
-
- private PulsarProducerFederation producerFederation;
- private LifecycleState status;
- private Map<String, String> dimensions;
-
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
- public PulsarFederationWorker(String sinkName, int workerIndex, PulsarFederationSinkContext context) {
- super();
- this.workerName = sinkName + "-worker-" + workerIndex;
- this.context = context;
- this.producerFederation = new PulsarProducerFederation(workerName, this.context);
- this.status = LifecycleState.IDLE;
- this.dimensions = new HashMap<>();
- this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, sinkName);
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- this.producerFederation.start();
- this.status = LifecycleState.START;
- super.start();
- }
-
- /**
- *
- * close
- */
- public void close() {
- // close all producers
- this.producerFederation.close();
- this.status = LifecycleState.STOP;
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
- while (status != LifecycleState.STOP) {
- try {
- Event currentRecord = context.getBufferQueue().pollRecord();
- if (currentRecord == null) {
- Thread.sleep(context.getProcessInterval());
- continue;
- }
- // fill topic
- this.fillTopic(currentRecord);
- // metric
- DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- currentRecord.getHeaders().get(Constants.TOPIC));
- long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- System.currentTimeMillis());
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
- DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
- metricItem.sendCount.incrementAndGet();
- metricItem.sendSize.addAndGet(currentRecord.getBody().length);
- // send
- this.producerFederation.send(currentRecord);
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- this.sleepOneInterval();
- }
- }
- }
-
- /**
- * fillTopic
- *
- * @param currentRecord
- */
- private void fillTopic(Event currentRecord) {
- Map<String, String> headers = currentRecord.getHeaders();
- String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
- String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
- String uid = IdTopicConfig.generateUid(inlongGroupId, inlongStreamId);
- String topic = this.context.getIdTopicHolder().getTopic(uid);
- if (!StringUtils.isBlank(topic)) {
- headers.put(Constants.TOPIC, topic);
- }
- }
-
- /**
- * sleepOneInterval
- */
- private void sleepOneInterval() {
- try {
- Thread.sleep(context.getProcessInterval());
- } catch (InterruptedException e1) {
- LOG.error(e1.getMessage(), e1);
- }
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
deleted file mode 100644
index 8175f7bc9..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.security.SecureRandom;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * PulsarProducerCluster
- */
-public class PulsarProducerCluster implements LifecycleAware {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerCluster.class);
-
- public static final String KEY_SERVICE_URL = "serviceUrl";
- public static final String KEY_AUTHENTICATION = "authentication";
- public static final String KEY_STATS_INTERVAL_SECONDS = "statsIntervalSeconds";
-
- public static final String KEY_ENABLEBATCHING = "enableBatching";
- public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
- public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
- public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
- public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
- public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
- public static final String KEY_SENDTIMEOUT = "sendTimeout";
- public static final String KEY_COMPRESSIONTYPE = "compressionType";
- public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
- public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
- + "BatchingPartitionSwitchFrequency";
-
- public static final String KEY_IOTHREADS = "ioThreads";
- public static final String KEY_MEMORYLIMIT = "memoryLimit";
- public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
-
- private final String workerName;
- private final CacheClusterConfig config;
- private final PulsarFederationSinkContext sinkContext;
- private final Context context;
- private final String cacheClusterName;
- private LifecycleState state;
-
- /**
- * pulsar client
- */
- private PulsarClient client;
- private ProducerBuilder<byte[]> baseBuilder;
-
- private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
-
- /**
- * Constructor
- *
- * @param workerName
- * @param config
- * @param context
- */
- public PulsarProducerCluster(String workerName, CacheClusterConfig config, PulsarFederationSinkContext context) {
- this.workerName = workerName;
- this.config = config;
- this.sinkContext = context;
- this.context = context.getProducerContext();
- this.state = LifecycleState.IDLE;
- this.cacheClusterName = config.getClusterName();
- }
-
- /**
- * start
- */
- @Override
- public void start() {
- this.state = LifecycleState.START;
- // create pulsar client
- try {
- String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
- String authentication = config.getParams().get(KEY_AUTHENTICATION);
- this.client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .authentication(AuthenticationFactory.token(authentication))
- .ioThreads(context.getInteger(KEY_IOTHREADS, 1))
- .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
- .connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
- .statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS), -1),
- TimeUnit.SECONDS)
- .build();
- this.baseBuilder = client.newProducer();
- // Map<String, Object> builderConf = new HashMap<>();
- // builderConf.putAll(context.getParameters());
- this.baseBuilder
- .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
- .maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
- .maxPendingMessagesAcrossPartitions(
- context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
- .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500));
- this.baseBuilder
- .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
- TimeUnit.MILLISECONDS);
- this.baseBuilder
- .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
- this.baseBuilder
- .accessMode(ProducerAccessMode.Shared)
- .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
- .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
- this.baseBuilder
- .roundRobinRouterBatchingPartitionSwitchFrequency(
- context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
- .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
- .compressionType(this.getPulsarCompressionType());
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * getPulsarCompressionType
- *
- * @return CompressionType
- */
- private CompressionType getPulsarCompressionType() {
- String type = this.context.getString(KEY_COMPRESSIONTYPE);
- switch (type) {
- case "LZ4":
- return CompressionType.LZ4;
- case "NONE":
- return CompressionType.NONE;
- case "ZLIB":
- return CompressionType.ZLIB;
- case "ZSTD":
- return CompressionType.ZSTD;
- case "SNAPPY":
- return CompressionType.SNAPPY;
- default:
- return CompressionType.NONE;
- }
- }
-
- /**
- * stop
- */
- @Override
- public void stop() {
- this.state = LifecycleState.STOP;
- //
- for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- try {
- this.client.close();
- } catch (PulsarClientException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * getLifecycleState
- *
- * @return
- */
- @Override
- public LifecycleState getLifecycleState() {
- return state;
- }
-
- /**
- * send
- *
- * @param event
- */
- public boolean send(Event event) {
- // send
- Map<String, String> headers = event.getHeaders();
- String topic = headers.get(Constants.TOPIC);
- // get producer
- Producer<byte[]> producer = this.producerMap.get(topic);
- if (producer == null) {
- try {
- LOG.info("try to new a object for topic " + topic);
- SecureRandom secureRandom = new SecureRandom(
- (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis()).getBytes());
- String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-" + secureRandom.nextLong();
- producer = baseBuilder.clone().topic(topic)
- .producerName(producerName)
- .create();
- LOG.info("create new producer success:{}", producer.getProducerName());
- Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
- if (oldProducer != null) {
- producer.close();
- LOG.info("close producer success:{}", producer.getProducerName());
- producer = oldProducer;
- }
- } catch (Throwable ex) {
- LOG.error("create new producer failed", ex);
- }
- }
- // create producer failed
- if (producer == null) {
- sinkContext.getBufferQueue().release(event.getBody().length);
- this.addMetric(event, topic, false, 0);
- return false;
- }
- // sendAsync
- CompletableFuture<MessageId> future = null;
- String messageKey = headers.get(Constants.MESSAGE_KEY);
- long sendTime = System.currentTimeMillis();
- if (messageKey == null) {
- future = producer.newMessage().properties(headers)
- .value(event.getBody()).sendAsync();
- } else {
- future = producer.newMessage().key(messageKey).properties(headers)
- .value(event.getBody()).sendAsync();
- }
- // callback
- future.whenCompleteAsync((msgId, ex) -> {
- if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- sinkContext.getBufferQueue().offer(event);
- this.addMetric(event, topic, false, 0);
- } else {
- sinkContext.getBufferQueue().release(event.getBody().length);
- this.addMetric(event, topic, true, sendTime);
- }
- });
- return true;
- }
-
- /**
- * addMetric
- *
- * @param event
- * @param topic
- * @param result
- * @param sendTime
- */
- private void addMetric(Event event, String topic, boolean result, long sendTime) {
- // metric
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.sinkContext.getProxyClusterId());
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
- DataProxyMetricItem.fillInlongId(event, dimensions);
- DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
- DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
- if (result) {
- metricItem.sendSuccessCount.incrementAndGet();
- metricItem.sendSuccessSize.addAndGet(event.getBody().length);
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- if (sendTime > 0) {
- long msgTime = AuditUtils.getLogTime(event);
- long currentTime = System.currentTimeMillis();
- long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration);
- metricItem.nodeDuration.addAndGet(nodeDuration);
- metricItem.wholeDuration.addAndGet(wholeDuration);
- }
- } else {
- metricItem.sendFailCount.incrementAndGet();
- metricItem.sendFailSize.addAndGet(event.getBody().length);
- }
- }
-
- /**
- * get cacheClusterName
- *
- * @return the cacheClusterName
- */
- public String getCacheClusterName() {
- return cacheClusterName;
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
deleted file mode 100644
index 7bc5153bd..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * PulsarProducerSet
- */
-public class PulsarProducerFederation {
-
- public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerFederation.class);
-
- private final String workerName;
- private final PulsarFederationSinkContext context;
- private Timer reloadTimer;
-
- private List<PulsarProducerCluster> clusterList = new ArrayList<>();
- private List<PulsarProducerCluster> deletingClusterList = new ArrayList<>();
-
- private AtomicInteger clusterIndex = new AtomicInteger(0);
-
- /**
- * Constructor
- *
- * @param workerName
- * @param context
- */
- public PulsarProducerFederation(String workerName, PulsarFederationSinkContext context) {
- this.workerName = workerName;
- this.context = context;
- }
-
- /**
- * start
- */
- public void start() {
- try {
- this.reload();
- this.setReloadTimer();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * close
- */
- public void close() {
- try {
- this.reloadTimer.cancel();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- for (PulsarProducerCluster cluster : this.clusterList) {
- cluster.stop();
- }
- }
-
- /**
- * setReloadTimer
- */
- private void setReloadTimer() {
- reloadTimer = new Timer(true);
- TimerTask task = new TimerTask() {
-
- public void run() {
- reload();
- }
- };
- reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
- context.getReloadInterval());
- }
-
- /**
- * reload
- */
- public void reload() {
- try {
- // stop deleted cluster
- deletingClusterList.forEach(item -> {
- item.stop();
- });
- deletingClusterList.clear();
- // update cluster list
- List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
- List<PulsarProducerCluster> newClusterList = new ArrayList<>(configList.size());
- // prepare
- Set<String> newClusterNames = new HashSet<>();
- configList.forEach(item -> {
- newClusterNames.add(item.getClusterName());
- });
- Set<String> oldClusterNames = new HashSet<>();
- clusterList.forEach(item -> {
- oldClusterNames.add(item.getCacheClusterName());
- });
- // add
- for (CacheClusterConfig config : configList) {
- if (!oldClusterNames.contains(config.getClusterName())) {
- PulsarProducerCluster cluster = new PulsarProducerCluster(workerName, config, context);
- cluster.start();
- newClusterList.add(cluster);
- }
- }
- // remove
- for (PulsarProducerCluster cluster : this.clusterList) {
- if (newClusterNames.contains(cluster.getCacheClusterName())) {
- newClusterList.add(cluster);
- } else {
- deletingClusterList.add(cluster);
- }
- }
- this.clusterList = newClusterList;
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- /**
- * send
- *
- * @param event
- */
- public boolean send(Event event) {
- int currentIndex = clusterIndex.getAndIncrement();
- if (currentIndex > Integer.MAX_VALUE / 2) {
- clusterIndex.set(0);
- }
- List<PulsarProducerCluster> currentClusterList = this.clusterList;
- int currentSize = currentClusterList.size();
- int realIndex = currentIndex % currentSize;
- PulsarProducerCluster clusterProducer = currentClusterList.get(realIndex);
- return clusterProducer.send(event);
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
deleted file mode 100644
index bf2bd258d..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Charsets;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestPulsarSink {
-
- private MemoryChannel channel;
-
- @Before
- public void setUp() {
- PulsarSink sink = new PulsarSink();
- channel = new MemoryChannel();
- Context context = new Context();
- context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
- sink.setChannel(channel);
-
- this.channel.configure(context);
- }
-
- @Test
- public void testProcess() {
- Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (int i = 0; i < 10; i++) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java
deleted file mode 100644
index 287bf0009..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestTubeSink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink;
-
-import com.google.common.base.Charsets;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestTubeSink {
-
- private MemoryChannel channel;
-
- @Before
- public void setUp() {
- TubeSink sink = new TubeSink();
- channel = new MemoryChannel();
- Context context = new Context();
- context.put("type", "org.apache.inlong.dataproxy.sink.TubeSink");
- sink.setChannel(channel);
-
- this.channel.configure(context);
- }
-
- @Test
- public void testProcess() {
- Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (int i = 0; i < 10; i++) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
deleted file mode 100644
index 9d617562b..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
-import org.apache.inlong.dataproxy.utils.MockUtils;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * TestPulsarFederationSink
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
-@PrepareForTest({MetricRegister.class})
-public class TestPulsarFederationSink {
-
- public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
- public static Context context;
- public static Context sinkContext;
- public static PulsarFederationSink sinkObj;
-
- /**
- * setup
- */
- @BeforeClass
- public static void setUp() {
- Map<String, String> result = new ConcurrentHashMap<>();
- try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource(
- "dataproxy-pulsar.conf")
- .openStream()) {
- Properties props = new Properties();
- props.load(inStream);
- for (Map.Entry<Object, Object> entry : props.entrySet()) {
- result.put((String) entry.getKey(), (String) entry.getValue());
- }
- context = new Context(result);
- sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
- } catch (Exception e) {
- LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
- }
- sinkObj = new PulsarFederationSink();
- sinkObj.configure(sinkContext);
- }
-
- /**
- * testResult
- */
- @Test
- public void testResult() throws Exception {
- MockUtils.mockMetricRegister();
- // mock
- Channel channel = MockUtils.mockChannel();
- sinkObj.setChannel(channel);
- sinkObj.process();
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
deleted file mode 100644
index b161218b4..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.utils.MockUtils;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * TestPulsarProducerFederation
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
-@PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
- Producer.class, ProducerBuilder.class, TypedMessageBuilder.class, MetricRegister.class})
-public class TestPulsarProducerFederation {
-
- public static final Logger LOG = LoggerFactory.getLogger(TestPulsarProducerFederation.class);
- public static Context context;
- public static Context sinkContext;
-
- /**
- * setup
- */
- @BeforeClass
- public static void setUp() {
- Map<String, String> result = new ConcurrentHashMap<>();
- try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource(
- "dataproxy-pulsar.conf").openStream()) {
- MockUtils.mockMetricRegister();
- Properties props = new Properties();
- props.load(inStream);
- for (Map.Entry<Object, Object> entry : props.entrySet()) {
- result.put((String) entry.getKey(), (String) entry.getValue());
- }
- context = new Context(result);
- sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
- MockUtils.mockPulsarClient();
- } catch (Exception e) {
- LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
- }
- }
-
- /**
- * testResult
- */
- @Test
- public void testResult() throws Exception {
- String workerName = "workerName";
- PulsarFederationSinkContext pulsarContext = new PulsarFederationSinkContext(MockUtils.SINK_ID, sinkContext);
- PulsarProducerFederation federation = new PulsarProducerFederation(workerName, pulsarContext);
- federation.start();
- Event event = MockUtils.mockEvent();
- boolean result = federation.send(event);
- assertTrue(result);
- }
-
-}