You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/08/15 10:56:55 UTC
[inlong] branch master updated: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 27217e34b [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
27217e34b is described below
commit 27217e34ba57097019fe0ebdac013363e95e2cc2
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Aug 15 18:56:48 2022 +0800
[INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
---
.../dataproxy/config/pojo/MQClusterConfig.java | 13 +-
.../dataproxy/sink/SimpleMessageTubeSink.java | 34 +-
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 881 ++++++++-------------
.../dataproxy/sink/common/MsgDedupHandler.java | 8 +
.../dataproxy/sink/common/TubeProducerHolder.java | 277 +++++++
.../inlong/dataproxy/sink/common/TubeUtils.java | 80 ++
6 files changed, 731 insertions(+), 562 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
index b49234b18..f1a87708e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
@@ -86,7 +86,7 @@ public class MQClusterConfig extends Context {
private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout";
- private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
+ private static final long DEFAULT_TUBE_REQUEST_TIMEOUT = 20000L;
private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
@@ -100,6 +100,9 @@ public class MQClusterConfig extends Context {
private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
+ private static final String HEARTBEAT_C2M_PERIOD_MS_MARK = "tube_heartbeat_period_ms";
+ private static final long DEFAULT_HEARTBEAT_C2M_PERIOD_MS = 15000L;
+
private static final String RECOVER_THREAD_COUNT = "recover_thread_count";
private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
@@ -159,6 +162,10 @@ public class MQClusterConfig extends Context {
return getLong(NETTY_WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK);
}
+ public long getTubeHeartbeatPeriodMs() {
+ return getLong(HEARTBEAT_C2M_PERIOD_MS_MARK, DEFAULT_HEARTBEAT_C2M_PERIOD_MS);
+ }
+
public int getRecoverThreadCount() {
return getInteger(RECOVER_THREAD_COUNT, DEFAULT_RECOVER_THREAD_COUNT);
}
@@ -284,8 +291,8 @@ public class MQClusterConfig extends Context {
return getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD, DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD);
}
- public int getTubeRequestTimeout() {
- return getInteger(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT);
+ public long getTubeRpcTimeoutMs() {
+ return getLong(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT);
}
public String getLogTopic() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
index 990b44cac..521f3c24f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -43,12 +43,12 @@ import org.apache.flume.source.shaded.guava.RateLimiter;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
@@ -57,7 +57,6 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
import org.slf4j.Logger;
@@ -346,6 +345,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
}
class SinkTask implements Runnable {
+
private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es)
throws TubeClientException, InterruptedException {
if (msgDedupHandler.judgeDupAndPutMsgSeqId(
@@ -353,38 +353,12 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
logger.info("{} agent package {} existed,just discard.",
getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
} else {
- Message message = this.parseEvent2Message(topic, event);
- producer.sendMessage(message, new MyCallback(es));
+ producer.sendMessage(TubeUtils.buildMessage(
+ topic, event, true), new MyCallback(es));
flag.set(true);
}
illegalTopicMap.remove(topic);
}
-
- /**
- * parseEvent2Message
- * @param topic
- * @param event
- * @return
- */
- private Message parseEvent2Message(String topic, Event event) {
- Message message = new Message(topic, event.getBody());
- message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
- String streamId = "";
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
- message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
- // common attributes
- Map<String, String> headers = event.getHeaders();
- message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
- message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
- message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
- message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
- message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
- return message;
- }
private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
if (t instanceof TubeClientException) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 00abce075..e49903329 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -17,20 +17,19 @@
package org.apache.inlong.dataproxy.sink;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.SetUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -42,382 +41,172 @@ import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.shaded.guava.RateLimiter;
import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.NetworkUtils;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TubeSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
- private static final MsgDedupHandler msgDedupHandler = new MsgDedupHandler();
- private static final ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
+ private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
+ private TubeProducerHolder producerHolder = null;
private static final String TOPIC = "topic";
- // key: masterUrl
- public Map<String, TubeMultiSessionFactory> sessionFactories;
- public Map<String, List<TopicProducerInfo>> masterUrl2producers;
- // key: topic
- public Map<String, List<TopicProducerInfo>> producerInfoMap;
private volatile boolean canTake = false;
private volatile boolean canSend = false;
+ private volatile boolean isOverFlow = false;
private ConfigManager configManager;
private Map<String, String> topicProperties;
private MQClusterConfig tubeConfig;
+ private String usedMasterAddr = null;
private Set<String> masterHostAndPortLists;
-
// used for RoundRobin different cluster while send message
- private AtomicInteger clusterIndex = new AtomicInteger(0);
- private LinkedBlockingQueue<EventStat> resendQueue;
- private LinkedBlockingQueue<Event> eventQueue;
private RateLimiter diskRateLimiter;
private Thread[] sinkThreadPool;
private Map<String, String> dimensions;
private DataProxyMetricItemSet metricItemSet;
-
- private boolean overflow = false;
-
- /**
- * diff publish
- */
- public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
- if (SetUtils.isEqualSet(originalSet, endSet)) {
- return;
- }
-
- boolean changed = false;
- Set<String> newTopics = new HashSet<>();
- for (String s : endSet) {
- if (!originalSet.contains(s)) {
- changed = true;
- newTopics.add(s);
- }
- }
-
- if (changed) {
- try {
- initTopicSet(newTopics);
- } catch (Exception e) {
- logger.info("meta sink publish new topic fail.", e);
- }
-
- logger.info("topics.properties has changed, trigger diff publish for {}", getName());
- topicProperties = configManager.getTopicProperties();
- }
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private static final LogCounter LOG_SINK_TASK_PRINTER =
+ new LogCounter(10, 100000, 60 * 1000);
+ private LinkedBlockingQueue<Event> eventQueue;
+ private LinkedBlockingQueue<EventStat> resendQueue;
+ private final AtomicLong cachedMsgCnt = new AtomicLong(0);
+ private final AtomicLong takenMsgCnt = new AtomicLong(0);
+ private final AtomicLong resendMsgCnt = new AtomicLong(0);
+ private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0);
+ private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0);
+ private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0);
+ private final AtomicLong inflightMsgCnt = new AtomicLong(0);
+ private final AtomicLong successMsgCnt = new AtomicLong(0);
+ // statistic thread
+ private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
+ .newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread"));
+
+ {
+ SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L,
+ 60L, TimeUnit.SECONDS);
+ logger.info("success to start performance statistic task!");
}
- /**
- * when masterUrlLists change, update tubeClient
- *
- * @param originalCluster previous masterHostAndPortList set
- * @param endCluster new masterHostAndPortList set
- */
- public void diffUpdateTubeClient(Set<String> originalCluster, Set<String> endCluster) {
- if (SetUtils.isEqualSet(originalCluster, endCluster)) {
- return;
- }
- // close
- for (String masterUrl : originalCluster) {
-
- if (!endCluster.contains(masterUrl)) {
- // step1: close and remove all related producers
- List<TopicProducerInfo> producerInfoList = masterUrl2producers.get(masterUrl);
- if (producerInfoList != null) {
- for (TopicProducerInfo producerInfo : producerInfoList) {
- producerInfo.shutdown();
- // remove from topic<->producer map
- for (String topic : producerInfo.getTopicSet()) {
- List<TopicProducerInfo> curTopicProducers = producerInfoMap.get(topic);
- if (curTopicProducers != null) {
- curTopicProducers.remove(producerInfo);
- }
- }
- }
- // remove from masterUrl<->producer map
- masterUrl2producers.remove(masterUrl);
- }
-
- // step2: close and remove related sessionFactories
- TubeMultiSessionFactory sessionFactory = sessionFactories.get(masterUrl);
- if (sessionFactory != null) {
- try {
- sessionFactory.shutdown();
- } catch (TubeClientException e) {
- logger.error("destroy sessionFactory error in tubesink, MetaClientException {}",
- e.getMessage());
- }
- sessionFactories.remove(masterUrl);
- }
-
- logger.info("close tubeClient of masterList:{}", masterUrl);
- }
-
- }
- // start new client
- for (String masterUrl : endCluster) {
- if (!originalCluster.contains(masterUrl)) {
- TubeMultiSessionFactory sessionFactory = createConnection(masterUrl);
- if (sessionFactory != null) {
- List<Set<String>> topicGroups = partitionTopicSet(new HashSet<>(topicProperties.values()));
- for (Set<String> topicSet : topicGroups) {
- createTopicProducers(masterUrl, sessionFactory, topicSet);
- }
- logger.info("successfully start new tubeClient for the new masterList: {}", masterUrl);
- }
- }
- }
-
+ @Override
+ public void configure(Context context) {
+ logger.info(getName() + " configure from context: {}", context);
+ // initial parameters
+ configManager = ConfigManager.getInstance();
+ tubeConfig = configManager.getMqClusterConfig();
+ topicProperties = configManager.getTopicProperties();
masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
- }
-
- /**
- * when there are multi clusters, pick producer based on round-robin
- */
- private MessageProducer getProducer(String topic) throws TubeClientException {
- if (producerInfoMap.containsKey(topic) && !producerInfoMap.get(topic).isEmpty()) {
-
- List<TopicProducerInfo> producers = producerInfoMap.get(topic);
- // round-roubin dispatch
- int currentIndex = clusterIndex.getAndIncrement();
- if (currentIndex > Integer.MAX_VALUE / 2) {
- clusterIndex.set(0);
- }
- int producerIndex = currentIndex % producers.size();
- return producers.get(producerIndex).getProducer();
- }
- return null;
-// else {
-// synchronized (this) {
-// if (!producerInfoMap.containsKey(topic)) {
-// if (producer == null || currentPublishTopicNum.get() >= tubeConfig.getMaxTopicsEachProducerHold()) {
-// producer = sessionFactory.createProducer();
-// currentPublishTopicNum.set(0);
-// }
-// // publish topic
-// producer.publish(topic);
-// producerMap.put(topic, producer);
-// currentPublishTopicNum.incrementAndGet();
-// }
-// }
-// return producerMap.get(topic);
-// }
- }
-
- private TubeClientConfig initTubeConfig(String masterUrl) throws Exception {
- final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterUrl);
- tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount());
- tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount());
- tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount());
- tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark());
- tubeClientConfig.setHeartbeatPeriodMs(15000L);
- tubeClientConfig.setRpcTimeoutMs(20000L);
-
- return tubeClientConfig;
- }
-
- /**
- * If this function is called successively without calling {@see #destroyConnection()}, only the
- * first call has any effect.
- *
- * @throws FlumeException if an RPC client connection could not be opened
- */
- private void initCreateConnection() throws FlumeException {
- // check the TubeMQ address
- if (masterHostAndPortLists == null || masterHostAndPortLists.isEmpty()) {
- logger.warn("Failed to get TubeMQ Cluster, make sure register TubeMQ to manager successfully.");
- return;
- }
- // if already connected, just skip
- if (sessionFactories != null) {
- return;
- }
- sessionFactories = new HashMap<>();
- for (String masterUrl : masterHostAndPortLists) {
- createConnection(masterUrl);
- }
-
- if (sessionFactories.size() == 0) {
- throw new FlumeException("create tube sessionFactories err, please re-check");
- }
- }
-
- private TubeMultiSessionFactory createConnection(String masterHostAndPortList) {
- TubeMultiSessionFactory sessionFactory;
- try {
- TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
- sessionFactory = new TubeMultiSessionFactory(conf);
- sessionFactories.put(masterHostAndPortList, sessionFactory);
- } catch (Throwable e) {
- logger.error("connect to tube meta error, maybe tube master set error/shutdown, please re-check", e);
- throw new FlumeException("connect to tube meta error, maybe tube master set error/shutdown in progress, "
- + "please re-check");
- }
- return sessionFactory;
- }
-
- private void destroyConnection() {
- for (List<TopicProducerInfo> producerInfoList : producerInfoMap.values()) {
- for (TopicProducerInfo producerInfo : producerInfoList) {
- producerInfo.shutdown();
- }
- }
- producerInfoMap.clear();
-
- if (sessionFactories != null) {
- for (TubeMultiSessionFactory sessionFactory : sessionFactories.values()) {
- try {
- sessionFactory.shutdown();
- } catch (Exception e) {
- logger.error("destroy sessionFactory error in tubesink: ", e);
- }
- }
- }
- sessionFactories.clear();
- masterUrl2producers.clear();
- logger.debug("closed meta producer");
- }
-
- /**
- * partition topicSet to different group, each group is associated with a producer;
- * if there are multi clusters, then each group is associated with a set of producer
- */
- private List<Set<String>> partitionTopicSet(Set<String> topicSet) {
- List<Set<String>> topicGroups = new ArrayList<>();
-
- List<String> sortedList = new ArrayList<>(topicSet);
- Collections.sort(sortedList);
- int maxTopicsEachProducerHolder = tubeConfig.getMaxTopicsEachProducerHold();
- int cycle = sortedList.size() / maxTopicsEachProducerHolder;
- int remainder = sortedList.size() % maxTopicsEachProducerHolder;
-
- for (int i = 0; i <= cycle; i++) {
- // allocate topic
- Set<String> subset = new HashSet<>();
- int startIndex = i * maxTopicsEachProducerHolder;
- int endIndex = startIndex + maxTopicsEachProducerHolder - 1;
- if (i == cycle) {
- if (remainder == 0) {
- continue;
- } else {
- endIndex = startIndex + remainder - 1;
- }
- }
- for (int index = startIndex; index <= endIndex; index++) {
- subset.add(sortedList.get(index));
- }
-
- topicGroups.add(subset);
+ // start message deduplication handler
+ MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
+ tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
+ // only use first cluster address now
+ usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
+ // create producer holder
+ producerHolder = new TubeProducerHolder(getName(),
+ usedMasterAddr, configManager.getMqClusterConfig());
+ // initial TubeMQ configure
+ // initial resend queue size
+ int badEventQueueSize = tubeConfig.getBadEventQueueSize();
+ Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
+ resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
+ // initial sink thread pool
+ int threadNum = tubeConfig.getThreadNum();
+ Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+ sinkThreadPool = new Thread[threadNum];
+ // initial event queue size
+ int eventQueueSize = tubeConfig.getEventQueueSize();
+ Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
+ eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
+ // initial disk rate limiter
+ if (tubeConfig.getDiskIoRatePerSec() != 0) {
+ diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
}
- return topicGroups;
- }
-
- /**
- * create producer and publish topic
- */
- private void createTopicProducers(String masterUrl, TubeMultiSessionFactory sessionFactory,
- Set<String> topicGroup) {
-
- TopicProducerInfo info = new TopicProducerInfo(sessionFactory);
- info.initProducer();
- Set<String> succTopicSet = info.publishTopic(topicGroup);
-
- masterUrl2producers.computeIfAbsent(masterUrl, k -> new ArrayList<>()).add(info);
-
- if (succTopicSet != null) {
- for (String succTopic : succTopicSet) {
- producerInfoMap.computeIfAbsent(succTopic, k -> new ArrayList<>()).add(info);
-
+ // register configure change callback functions
+ configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
+ @Override
+ public void update() {
+ diffSetPublish(new HashSet<>(topicProperties.values()),
+ new HashSet<>(configManager.getTopicProperties().values()));
}
- }
- }
-
- private void initTopicSet(Set<String> topicSet) throws Exception {
- long startTime = System.currentTimeMillis();
-
- if (sessionFactories != null) {
- List<Set<String>> topicGroups = partitionTopicSet(topicSet);
- for (Set<String> subset : topicGroups) {
- for (Map.Entry<String, TubeMultiSessionFactory> entry : sessionFactories.entrySet()) {
- createTopicProducers(entry.getKey(), entry.getValue(), subset);
- }
+ });
+ configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+ @Override
+ public void update() {
+ diffUpdateTubeClient(masterHostAndPortLists,
+ configManager.getMqClusterUrl2Token().keySet());
}
- logger.info(getName() + " producer is ready for topics : " + producerInfoMap.keySet());
- logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
- }
+ });
}
@Override
public void start() {
+ if (!this.started.compareAndSet(false, true)) {
+ logger.info("Duplicated call, " + getName() + " has started!");
+ return;
+ }
this.dimensions = new HashMap<>();
this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
// register metrics
this.metricItemSet = new DataProxyMetricItemSet(this.getName());
MetricRegister.register(metricItemSet);
-
// create tube connection
try {
- initCreateConnection();
+ producerHolder.start(new HashSet<>(topicProperties.values()));
} catch (FlumeException e) {
- logger.error("Unable to create tube client" + ". Exception follows.", e);
- // Try to prevent leaking resources
- destroyConnection();
- // FIXME: Mark ourselves as failed
- stop();
+ logger.error("Unable to start TubeMQ client. Exception follows.", e);
+ super.stop();
return;
}
-
// start the cleaner thread
-
super.start();
this.canSend = true;
this.canTake = true;
-
- try {
- initTopicSet(new HashSet<String>(topicProperties.values()));
- } catch (Exception e) {
- logger.info("meta sink start publish topic fail.", e);
- }
-
for (int i = 0; i < sinkThreadPool.length; i++) {
- sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+ sinkThreadPool[i] = new Thread(new TubeSinkTask(),
+ getName() + "_tube_sink_sender-" + i);
sinkThreadPool[i].start();
}
-
+ logger.info(getName() + " started!");
}
- /**
- * resend event
- */
- private void resendEvent(EventStat es, boolean isDecrement) {
- try {
- if (es == null || es.getEvent() == null) {
- return;
+ @Override
+ public void stop() {
+ if (!this.started.compareAndSet(true, false)) {
+ logger.info("Duplicated call, " + getName() + " has stopped!");
+ return;
+ }
+ this.canTake = false;
+ if (sinkThreadPool != null) {
+ for (Thread thread : sinkThreadPool) {
+ if (thread == null) {
+ continue;
+ }
+ thread.interrupt();
}
- msgDedupHandler.invalidMsgSeqId(es.getEvent()
- .getHeaders().get(ConfigConstants.SEQUENCE_ID));
- } catch (Throwable throwable) {
- logger.error(getName() + " Discard msg because put events to both of queue and "
- + "fileChannel fail,current resendQueue.size = "
- + resendQueue.size(), throwable);
}
+ if (producerHolder != null) {
+ producerHolder.stop();
+ }
+ super.stop();
+ logger.info(getName() + " stopped!");
}
@Override
@@ -442,19 +231,20 @@ public class TubeSink extends AbstractSink implements Configurable {
} else {
dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
}
- if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
- logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
- + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
- + "it will cause memoryChannel full and fileChannel write.)", getName());
- tx.rollback();
- // metric
+ if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
+ tx.commit();
+ cachedMsgCnt.incrementAndGet();
DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
+ metricItem.readSuccessCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
} else {
- tx.commit();
+ tx.rollback();
+ //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
+ // + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time "
+ // + "it will cause memoryChannel full and fileChannel write.)", getName());
+ // metric
DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readSuccessCount.incrementAndGet();
+ metricItem.readFailCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
}
} else {
@@ -475,196 +265,116 @@ public class TubeSink extends AbstractSink implements Configurable {
return status;
}
- @Override
- public void configure(Context context) {
- logger.info("configure from context: {}", context);
-
- configManager = ConfigManager.getInstance();
- topicProperties = configManager.getTopicProperties();
- masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
- tubeConfig = configManager.getMqClusterConfig();
- configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
- @Override
- public void update() {
- diffSetPublish(new HashSet<>(topicProperties.values()),
- new HashSet<>(configManager.getTopicProperties().values()));
- }
- });
- configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
- @Override
- public void update() {
- diffUpdateTubeClient(masterHostAndPortLists, configManager.getMqClusterUrl2Token().keySet());
- }
- });
-
- producerInfoMap = new ConcurrentHashMap<>();
- masterUrl2producers = new ConcurrentHashMap<>();
- // start message deduplication handler
- msgDedupHandler.start(tubeConfig.getClientIdCache(),
- tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
-
- int badEventQueueSize = tubeConfig.getBadEventQueueSize();
- Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
- resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
-
- int threadNum = tubeConfig.getThreadNum();
- Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
- sinkThreadPool = new Thread[threadNum];
- int eventQueueSize = tubeConfig.getEventQueueSize();
- Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
- eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
-
- if (tubeConfig.getDiskIoRatePerSec() != 0) {
- diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
- }
-
- }
-
- private Map<String, String> getNewDimension(String otherKey, String value) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- dimensions.put(otherKey, value);
- return dimensions;
- }
-
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
- public DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
- class SinkTask implements Runnable {
-
- private void sendMessage(MessageProducer producer, Event event,
- String topic, AtomicBoolean flag, EventStat es)
- throws TubeClientException, InterruptedException {
- if (msgDedupHandler.judgeDupAndPutMsgSeqId(
- event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
- logger.info("{} agent package {} existed,just discard.",
- getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
- } else {
- Message message = new Message(topic, event.getBody());
- message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
- String streamId = "";
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
- message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
- producer.sendMessage(message, new MyCallback(es));
- flag.set(true);
- }
- illegalTopicMap.remove(topic);
- }
-
- private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
- if (t instanceof TubeClientException) {
- String message = t.getMessage();
- if (message != null && (message.contains("No available queue for topic")
- || message.contains("The brokers of topic are all forbidden"))) {
- illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
- logger.info("IllegalTopicMap.put " + topic);
- return;
- } else {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- //ignore..
- }
- }
- }
- logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name="
- + Thread.currentThread().getName()
- + ",event.headers=" + es.getEvent().getHeaders(), t);
+ private class TubeSinkTask implements Runnable {
+ public TubeSinkTask() {
+ // ignore
}
@Override
public void run() {
+ Event event = null;
+ EventStat es = null;
+ String topic = null;
+ boolean sendFinished = false;
+ MessageProducer producer = null;
logger.info("sink task {} started.", Thread.currentThread().getName());
while (canSend) {
- boolean decrementFlag = false;
- boolean resendBadEvent = false;
- Event event = null;
- EventStat es = null;
- String topic = null;
try {
- if (TubeSink.this.overflow) {
- TubeSink.this.overflow = false;
- Thread.sleep(10);
+ if (isOverFlow) {
+ isOverFlow = false;
+ Thread.sleep(30);
}
+ event = null;
+ topic = null;
+ // get event from queues
if (!resendQueue.isEmpty()) {
es = resendQueue.poll();
- if (es != null) {
- event = es.getEvent();
- // logger.warn("Resend event: {}", event.toString());
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- resendBadEvent = true;
+ if (es == null) {
+ continue;
+ }
+ resendMsgCnt.decrementAndGet();
+ event = es.getEvent();
+ if (event.getHeaders().containsKey(TOPIC)) {
+ topic = event.getHeaders().get(TOPIC);
}
} else {
- event = eventQueue.take();
+ event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ if (!canTake && takenMsgCnt.get() <= 0) {
+ logger.info("Found canTake is false and taken message count is zero, braek!");
+ break;
+ }
+ continue;
+ }
+ cachedMsgCnt.decrementAndGet();
+ takenMsgCnt.incrementAndGet();
es = new EventStat(event);
-// sendCnt.incrementAndGet();
if (event.getHeaders().containsKey(TOPIC)) {
topic = event.getHeaders().get(TOPIC);
}
}
-
- if (event == null) {
- // ignore event is null, when multiple-thread SinkTask running
- // this null value comes from resendQueue
- continue;
- }
-
- if (topic == null || topic.equals("")) {
- logger.warn("no topic specified in event header, just skip this event");
- continue;
- }
-
- Long expireTime = illegalTopicMap.get(topic);
- if (expireTime != null) {
- long currentTime = System.currentTimeMillis();
- if (expireTime > currentTime) {
- // TODO: need to be improved.
-// reChannelEvent(es, topic);
- continue;
- } else {
- illegalTopicMap.remove(topic);
+ // valid event status
+ if (StringUtils.isBlank(topic)) {
+ blankTopicDiscardMsgCnt.incrementAndGet();
+ takenMsgCnt.decrementAndGet();
+ if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+ logger.error("No topic specified, just discard the event, event header is "
+ + event.getHeaders().toString());
}
- }
- MessageProducer producer = null;
- try {
- producer = getProducer(topic);
- } catch (Exception e) {
- logger.error("Get producer failed!", e);
- }
-
- if (producer == null) {
- illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
continue;
}
-
- AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
- sendMessage(producer, event, topic, flagAtomic, es);
- decrementFlag = flagAtomic.get();
+ // send message
+ sendFinished = sendMessage(es, event, topic);
} catch (InterruptedException e) {
logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
return;
} catch (Throwable t) {
- handleException(t, topic, decrementFlag, es);
- resendEvent(es, decrementFlag);
+ resendEvent(es, sendFinished);
+ if (t instanceof TubeClientException) {
+ String message = t.getMessage();
+ if (message != null && (message.contains("No available queue for topic")
+ || message.contains("The brokers of topic are all forbidden"))) {
+ isOverFlow = true;
+ }
+ }
+ if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+ logger.error("Sink task fail to send the message, finished =" + sendFinished
+ + ",sink.name=" + Thread.currentThread().getName()
+ + ",event.headers=" + es.getEvent().getHeaders(), t);
+ }
}
}
+ logger.info("sink task {} stopped!", Thread.currentThread().getName());
+ }
+
+ private boolean sendMessage(EventStat es, Event event, String topic) throws Exception {
+ MessageProducer producer = producerHolder.getProducer(topic);
+ if (producer == null) {
+ frozenTopicDiscardMsgCnt.incrementAndGet();
+ takenMsgCnt.decrementAndGet();
+ if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+ logger.error("Get producer failed for " + topic);
+ }
+ return false;
+ }
+ if (MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId(
+ event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
+ dupDiscardMsgCnt.incrementAndGet();
+ takenMsgCnt.decrementAndGet();
+ logger.info("{} agent package {} existed,just discard.",
+ Thread.currentThread().getName(),
+ event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
+ return false;
+ } else {
+ producer.sendMessage(TubeUtils.buildMessage(
+ topic, event, false), new MyCallback(es));
+ inflightMsgCnt.incrementAndGet();
+ return true;
+ }
}
}
- public class MyCallback implements MessageSentCallback {
+ private class MyCallback implements MessageSentCallback {
private EventStat myEventStat;
private long sendTime;
@@ -677,7 +387,9 @@ public class TubeSink extends AbstractSink implements Configurable {
@Override
public void onMessageSent(final MessageSentResult result) {
if (result.isSuccess()) {
- // TODO: add stats
+ successMsgCnt.incrementAndGet();
+ inflightMsgCnt.decrementAndGet();
+ takenMsgCnt.decrementAndGet();
this.addMetric(myEventStat.getEvent(), true, sendTime);
} else {
this.addMetric(myEventStat.getEvent(), false, 0);
@@ -685,10 +397,9 @@ public class TubeSink extends AbstractSink implements Configurable {
logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
result.getErrMsg(), resendQueue.size(),
myEventStat.getEvent().hashCode());
-
return;
- }
- if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
+ } else if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW
+ && LOG_SINK_TASK_PRINTER.shouldPrint()) {
logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
result.getErrMsg(), resendQueue.size(),
myEventStat.getEvent().hashCode());
@@ -731,65 +442,177 @@ public class TubeSink extends AbstractSink implements Configurable {
@Override
public void onException(final Throwable e) {
- Throwable t = e;
- while (t.getCause() != null) {
- t = t.getCause();
- }
- if (t instanceof OverflowException) {
- TubeSink.this.overflow = true;
- }
resendEvent(myEventStat, true);
}
}
- class TopicProducerInfo {
-
- private TubeMultiSessionFactory sessionFactory;
- private MessageProducer producer;
- private Set<String> topicSet;
+ private class TubeStatsTask implements Runnable {
- public TopicProducerInfo(TubeMultiSessionFactory sessionFactory) {
- this.sessionFactory = sessionFactory;
+ @Override
+ public void run() {
+ if (!canTake && takenMsgCnt.get() <= 0) {
+ return;
+ }
+ logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get()
+ + ", takenMsgCnt=" + takenMsgCnt.get()
+ + ", resendMsgCnt=" + resendMsgCnt.get()
+ + ", blankTopicDiscardMsgCnt=" + blankTopicDiscardMsgCnt.get()
+ + ", frozenTopicDiscardMsgCnt=" + frozenTopicDiscardMsgCnt.get()
+ + ", dupDiscardMsgCnt=" + dupDiscardMsgCnt.get()
+ + ", inflightMsgCnt=" + inflightMsgCnt.get()
+ + ", successMsgCnt=" + successMsgCnt.get());
}
+ }
- public void shutdown() {
- if (producer != null) {
- try {
- producer.shutdown();
- } catch (Throwable e) {
- logger.error("destroy producer error in tube sink", e);
+ /**
+ * resend event
+ */
+ private void resendEvent(EventStat es, boolean sendFinished) {
+ try {
+ if (sendFinished) {
+ inflightMsgCnt.decrementAndGet();
+ }
+ if (es == null || es.getEvent() == null) {
+ takenMsgCnt.decrementAndGet();
+ return;
+ }
+ MSG_DEDUP_HANDLER.invalidMsgSeqId(es.getEvent()
+ .getHeaders().get(ConfigConstants.SEQUENCE_ID));
+ if (resendQueue.offer(es)) {
+ resendMsgCnt.incrementAndGet();
+ } else {
+ FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
+ takenMsgCnt.decrementAndGet();
+ if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+ logger.error(Thread.currentThread().getName()
+ + " Channel --> Tube --> ResendQueue(full) -->"
+ + "FailOverChannelProcessor(current code point),"
+ + " Resend queue is full,Check if Tube server or network is ok.");
}
}
+ } catch (Throwable throwable) {
+ takenMsgCnt.decrementAndGet();
+ if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
+ logger.error(getName() + " Discard msg because put events to both of queue and "
+ + "fileChannel fail,current resendQueue.size = "
+ + resendQueue.size(), throwable);
+ }
}
+ }
- public void initProducer() {
- if (sessionFactory == null) {
- logger.error("sessionFactory is null, can't create producer");
- return;
+ private Map<String, String> getNewDimension(String otherKey, String value) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+ dimensions.put(otherKey, value);
+ return dimensions;
+ }
+
+ /**
+ * Differentiate unpublished topic sets and publish them
+ * attention: only append added topics
+ *
+ * @param curTopicSet the current used topic set
+ * @param newTopicSet the latest configured topic set
+ */
+ private void diffSetPublish(Set<String> curTopicSet, Set<String> newTopicSet) {
+ if (!this.started.get()) {
+ logger.info(getName() + " not started, ignore this change!");
+ }
+ if (SetUtils.isEqualSet(curTopicSet, newTopicSet)) {
+ return;
+ }
+ // filter unpublished topics
+ Set<String> addedTopics = new HashSet<>();
+ for (String topic : newTopicSet) {
+ if (StringUtils.isBlank(topic)) {
+ continue;
}
- try {
- this.producer = sessionFactory.createProducer();
- } catch (TubeClientException e) {
- logger.error("create tube messageProducer error in tubesink, ex {}", e.getMessage());
+ if (!curTopicSet.contains(topic)) {
+ addedTopics.add(topic);
}
}
-
- public Set<String> publishTopic(Set<String> topicSet) {
+ // publish them
+ if (!addedTopics.isEmpty()) {
try {
- this.topicSet = producer.publish(topicSet);
- } catch (TubeClientException e) {
- logger.info(getName() + " meta sink initTopicSet fail.", e);
+ producerHolder.createProducersByTopicSet(addedTopics);
+ } catch (Exception e) {
+ logger.info(getName() + "'s publish new topic set fail.", e);
}
- return this.topicSet;
+ logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
+ addedTopics);
+ topicProperties = configManager.getTopicProperties();
}
+ }
- public MessageProducer getProducer() {
- return producer;
+ /**
+ * When masterUrlLists change, update tubeClient
+ * Requirement: when switching the Master cluster,
+ * the DataProxy node must not do the data reporting service
+ *
+ * @param curClusterSet previous masterHostAndPortList set
+ * @param newClusterSet new masterHostAndPortList set
+ */
+ private void diffUpdateTubeClient(Set<String> curClusterSet,
+ Set<String> newClusterSet) {
+ if (!this.started.get()) {
+ logger.info(getName() + " not started, ignore this change!");
+ }
+ if (newClusterSet == null || newClusterSet.isEmpty()
+ || SetUtils.isEqualSet(curClusterSet, newClusterSet)
+ || newClusterSet.contains(usedMasterAddr)) {
+ return;
+ }
+ String newMasterAddr = getFirstClusterAddr(newClusterSet);
+ if (newMasterAddr == null) {
+ return;
+ }
+ TubeProducerHolder newProducerHolder = new TubeProducerHolder(getName(),
+ newMasterAddr, configManager.getMqClusterConfig());
+ try {
+ newProducerHolder.start(new HashSet<>(configManager.getTopicProperties().values()));
+ } catch (Throwable e) {
+ logger.error(getName() + " create new producer holder for " + newMasterAddr
+ + " failure, throw exception is {}", e.getMessage());
+ return;
}
+ // replace current producer holder
+ final String tmpMasterAddr = usedMasterAddr;
+ TubeProducerHolder tmpProducerHolder = producerHolder;
+ producerHolder = newProducerHolder;
+ usedMasterAddr = newMasterAddr;
+ // close old producer holder
+ tmpProducerHolder.stop();
+ logger.info(getName() + " switch cluster from "
+ + tmpMasterAddr + " to " + usedMasterAddr);
+ }
- public Set<String> getTopicSet() {
- return this.topicSet;
+ /**
+ * Get first cluster address
+ *
+ * @param clusterSet cluster set configure
+ * @return the selected cluster address
+ * null if set is empty or if items are all blank
+ */
+ private String getFirstClusterAddr(Set<String> clusterSet) {
+ String tmpMasterAddr = null;
+ for (String masterAddr : clusterSet) {
+ if (StringUtils.isBlank(masterAddr)) {
+ continue;
+ }
+ tmpMasterAddr = masterAddr;
+ break;
}
+ return tmpMasterAddr;
+ }
+
+ /**
+ * get metricItemSet
+ *
+ * @return the metricItemSet
+ */
+ private DataProxyMetricItemSet getMetricItemSet() {
+ return metricItemSet;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
index 297fbc769..ba3dec596 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.common;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
import com.google.common.cache.LoadingCache;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -95,4 +96,11 @@ public class MsgDedupHandler {
}
return "Disable for message data deduplication function";
}
+
+ public CacheStats getCacheData() {
+ if (enableDataDedup) {
+ return msgSeqIdCache.stats();
+ }
+ return null;
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
new file mode 100644
index 000000000..c1852fed3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.common;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.FlumeException;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TubeProducerHolder {
+ private static final Logger logger =
+ LoggerFactory.getLogger(TubeProducerHolder.class);
+ private static final long SEND_FAILURE_WAIT = 30000L;
+ private static final long PUBLISH_FAILURE_WAIT = 60000L;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final String sinkName;
+ private final String clusterAddr;
+ private final MQClusterConfig clusterConfig;
+ private TubeMultiSessionFactory sessionFactory = null;
+ private final Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();
+ private MessageProducer lastProducer = null;
+ private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0);
+ private static final ConcurrentHashMap<String, AtomicLong> FROZEN_TOPIC_MAP
+ = new ConcurrentHashMap<>();
+
+ public TubeProducerHolder(String sinkName, String clusterAddr, MQClusterConfig tubeConfig) {
+ Preconditions.checkState(StringUtils.isNotBlank(clusterAddr),
+ "No TubeMQ's cluster address list specified");
+ this.sinkName = sinkName;
+ this.clusterAddr = clusterAddr;
+ this.clusterConfig = tubeConfig;
+ }
+
+ public void start(Set<String> configTopicSet) {
+ if (!this.started.compareAndSet(false, true)) {
+ logger.info("ProducerHolder for " + sinkName + " has started!");
+ return;
+ }
+ logger.info("ProducerHolder for " + sinkName + " begin to start!");
+ // create session factory
+ try {
+ TubeClientConfig clientConfig = TubeUtils.buildClientConfig(clusterAddr, this.clusterConfig);
+ this.sessionFactory = new TubeMultiSessionFactory(clientConfig);
+ createProducersByTopicSet(configTopicSet);
+ } catch (Throwable e) {
+ stop();
+ String errInfo = "Build session factory to " + clusterAddr
+ + " for " + sinkName + " failure, please re-check";
+ logger.error(errInfo, e);
+ throw new FlumeException(errInfo);
+ }
+ logger.info("ProducerHolder for " + sinkName + " started!");
+ }
+
+ public void stop() {
+ if (this.started.get()) {
+ return;
+ }
+ // change start flag
+ if (!this.started.compareAndSet(true, false)) {
+ logger.info("ProducerHolder for " + sinkName + " has stopped!");
+ return;
+ }
+ logger.info("ProducerHolder for " + sinkName + " begin to stop!");
+ for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
+ if (entry == null || entry.getValue() == null) {
+ continue;
+ }
+ try {
+ entry.getValue().shutdown();
+ } catch (Throwable e) {
+ // ignore log
+ }
+ }
+ producerMap.clear();
+ lastProducer = null;
+ lastPubTopicCnt.set(0);
+ FROZEN_TOPIC_MAP.clear();
+ if (sessionFactory != null) {
+ try {
+ sessionFactory.shutdown();
+ } catch (Throwable e) {
+ // ignore log
+ }
+ sessionFactory = null;
+ }
+ logger.info("ProducerHolder for " + sinkName + " finished stop!");
+ }
+
+ /**
+ * Get producer by topic name:
+ * i. if the topic is judged to be an illegal topic, return null;
+ * ii. if it is not an illegal topic or the status has expired, check:
+ * a. if the topic has been published before, return the corresponding producer directly;
+ * b. if the topic is not in the published list, perform the topic's publish action.
+ * If the topic is thrown exception during the publishing process,
+ * set the topic to an illegal topic
+ *
+ * @param topicName the topic name
+ *
+ * @return the producer
+ * if topic is illegal, return null
+ * @throws TubeClientException
+ */
+ public MessageProducer getProducer(String topicName) throws TubeClientException {
+ AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+ if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+ return null;
+ }
+ MessageProducer tmpProducer = producerMap.get(topicName);
+ if (tmpProducer != null) {
+ if (fbdTime != null) {
+ FROZEN_TOPIC_MAP.remove(topicName);
+ }
+ return tmpProducer;
+ }
+ synchronized (lastPubTopicCnt) {
+ fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+ if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+ return null;
+ }
+ if (lastProducer == null
+ || lastPubTopicCnt.get() >= clusterConfig.getMaxTopicsEachProducerHold()) {
+ lastProducer = sessionFactory.createProducer();
+ lastPubTopicCnt.set(0);
+ }
+ try {
+ lastProducer.publish(topicName);
+ } catch (Throwable e) {
+ fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+ if (fbdTime == null) {
+ AtomicLong tmpFbdTime = new AtomicLong();
+ fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime);
+ if (fbdTime == null) {
+ fbdTime = tmpFbdTime;
+ }
+ }
+ fbdTime.set(System.currentTimeMillis() + PUBLISH_FAILURE_WAIT);
+ logger.warn("Throw exception while publish topic="
+ + topicName + ", exception is " + e.getMessage());
+ return null;
+ }
+ producerMap.put(topicName, lastProducer);
+ lastPubTopicCnt.incrementAndGet();
+ return lastProducer;
+ }
+ }
+
+ /**
+ * Whether frozen production according to the exceptions returned by message sending
+ *
+ * @param topicName the topic name sent message
+ * @param throwable the exception information thrown when sending a message
+ *
+ * @return whether illegal topic
+ */
+ public boolean needFrozenSent(String topicName, Throwable throwable) {
+ if (throwable instanceof TubeClientException) {
+ String message = throwable.getMessage();
+ if (message != null && (message.contains("No available partition for topic")
+ || message.contains("The brokers of topic are all forbidden"))) {
+ AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName);
+ if (fbdTime == null) {
+ AtomicLong tmpFbdTime = new AtomicLong(0);
+ fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime);
+ if (fbdTime == null) {
+ fbdTime = tmpFbdTime;
+ }
+ }
+ fbdTime.set(System.currentTimeMillis() + SEND_FAILURE_WAIT);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create sink producers by configured topic set
+ * group topicSet to different group, each group is associated with a producer
+ *
+ * @param cfgTopicSet the configured topic set
+ */
+ public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
+ if (cfgTopicSet == null || cfgTopicSet.isEmpty()) {
+ return;
+ }
+ // filter published topics
+ List<String> filteredTopics = new ArrayList<>(cfgTopicSet.size());
+ for (String topicName : cfgTopicSet) {
+ if (StringUtils.isBlank(topicName)
+ || producerMap.get(topicName) != null) {
+ continue;
+ }
+ filteredTopics.add(topicName);
+ }
+ if (filteredTopics.isEmpty()) {
+ return;
+ }
+ // alloc topic count
+ Collections.sort(filteredTopics);
+ long startTime = System.currentTimeMillis();
+ int maxPublishTopicCnt = clusterConfig.getMaxTopicsEachProducerHold();
+ int allocTotalCnt = filteredTopics.size();
+ List<Integer> topicGroupCnt = new ArrayList<>();
+ int paddingCnt = (lastPubTopicCnt.get() <= 0)
+ ? 0 : (maxPublishTopicCnt - lastPubTopicCnt.get());
+ while (allocTotalCnt > 0) {
+ if (paddingCnt > 0) {
+ topicGroupCnt.add(Math.min(allocTotalCnt, paddingCnt));
+ allocTotalCnt -= paddingCnt;
+ paddingCnt = 0;
+ } else {
+ topicGroupCnt.add(Math.min(allocTotalCnt, maxPublishTopicCnt));
+ allocTotalCnt -= maxPublishTopicCnt;
+ }
+ }
+ // create producer
+ int startPos = 0;
+ int endPos = 0;
+ Set<String> subTopicSet = new HashSet<>();
+ for (Integer dltCnt : topicGroupCnt) {
+ // allocate topic items
+ subTopicSet.clear();
+ endPos = startPos + dltCnt;
+ for (int index = startPos; index < endPos; index++) {
+ subTopicSet.add(filteredTopics.get(index));
+ }
+ startPos = endPos;
+ // create producer
+ if (lastProducer == null
+ || lastPubTopicCnt.get() == maxPublishTopicCnt) {
+ lastProducer = sessionFactory.createProducer();
+ lastPubTopicCnt.set(0);
+ }
+ lastProducer.publish(subTopicSet);
+ lastPubTopicCnt.addAndGet(subTopicSet.size());
+ for (String topicItem : subTopicSet) {
+ producerMap.put(topicItem, lastProducer);
+ }
+ }
+ logger.info(sinkName + " initializes producers for topics:"
+ + producerMap.keySet() + ", cost: " + (System.currentTimeMillis() - startTime)
+ + "ms");
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
new file mode 100644
index 000000000..6a67978f8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.common;
+
+import java.util.Map;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.NetworkUtils;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.corebase.Message;
+
+public class TubeUtils {
+
+ /**
+ * Build TubeMQ's client configure
+ *
+ * @param clusterAddr the TubeMQ cluster address
+ * @param tubeConfig the TubeMQ cluster configure
+ * @return the TubeClientConfig object
+ */
+ public static TubeClientConfig buildClientConfig(String clusterAddr, MQClusterConfig tubeConfig) {
+ final TubeClientConfig tubeClientConfig = new TubeClientConfig(clusterAddr);
+ tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount());
+ tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount());
+ tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount());
+ tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark());
+ tubeClientConfig.setHeartbeatPeriodMs(tubeConfig.getTubeHeartbeatPeriodMs());
+ tubeClientConfig.setRpcTimeoutMs(tubeConfig.getTubeRpcTimeoutMs());
+ return tubeClientConfig;
+ }
+
+ /**
+ * Build TubeMQ's message
+ *
+ * @param topicName the topic name of message
+ * @param event the DataProxy event
+ * @param addExtraAttrs whether to add extra attributes
+ * @return the message object
+ */
+ public static Message buildMessage(String topicName,
+ Event event, boolean addExtraAttrs) {
+ Message message = new Message(topicName, event.getBody());
+ message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
+ String streamId = "";
+ if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
+ streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+ } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
+ }
+ message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ if (addExtraAttrs) {
+ // common attributes
+ Map<String, String> headers = event.getHeaders();
+ message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
+ message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
+ message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
+ message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
+ message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+ }
+ return message;
+ }
+}