You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/17 11:35:20 UTC
[inlong] branch master updated: [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5e26628 [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)
0c5e26628 is described below
commit 0c5e266288410eb903f53cadd8ec6a55ab930d59
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Sep 17 19:35:15 2022 +0800
[INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)
---
.../apache/inlong/dataproxy/sink/PulsarSink.java | 106 +++++++--------------
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 4 +-
.../dataproxy/sink/pulsar/PulsarClientService.java | 38 ++++----
.../inlong/dataproxy/sink/pulsar/SinkTask.java | 68 +++++--------
4 files changed, 79 insertions(+), 137 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 71aceb9c5..69da396bc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -51,6 +51,7 @@ import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -123,10 +124,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
return System.currentTimeMillis();
}
});
- /*
- * properties for header info
- */
- private static final String TOPIC = "topic";
/*
* for stat
*/
@@ -408,14 +405,16 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
+ "last long time it will cause memoryChannel full and fileChannel write.)", getName());
tx.rollback();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ event.getHeaders().get(ConfigConstants.TOPIC_KEY));
DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
metricItem.readFailCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
} else {
tx.commit();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ event.getHeaders().get(ConfigConstants.TOPIC_KEY));
DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
metricItem.readSuccessCount.incrementAndGet();
metricItem.readSuccessSize.addAndGet(event.getBody().length);
@@ -438,70 +437,33 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
return status;
}
- private void editStatistic(final Event event, String keyPostfix, boolean isOrder) {
- String topic = "";
- String streamId = "";
- String nodeIp;
- if (event != null) {
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
-
- // Compatible agent
- if (event.getHeaders().containsKey("ip")) {
- event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip"));
- event.getHeaders().remove("ip");
- }
-
- // Compatible agent
- if (event.getHeaders().containsKey("time")) {
- event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time"));
- event.getHeaders().remove("time");
- }
-
- if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) {
- nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
- if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
- if (nodeIp != null) {
- nodeIp = nodeIp.split(":")[0];
- }
-
- long msgCounterL = 1L;
- // msg counter
- if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
- msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- }
-
- String orderType = "non-order";
- if (isOrder) {
- orderType = "order";
- }
- StringBuilder newBase = new StringBuilder();
- newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
- .append(streamId).append(SEPARATOR).append(nodeIp)
- .append(SEPARATOR).append(NetworkUtils.getLocalIp())
- .append(SEPARATOR).append(orderType).append(SEPARATOR)
- .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
- long messageSize = event.getBody().length;
- if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) {
- messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
- }
-
- if (keyPostfix != null && !keyPostfix.equals("")) {
- monitorIndex.addAndGet(new String(newBase), 0, 0, 0, (int) msgCounterL);
- if (logPrinterB.shouldPrint()) {
- logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
- }
- } else {
- monitorIndex.addAndGet(new String(newBase), (int) msgCounterL, 1, messageSize, 0);
- }
- }
+ private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) {
+ if (event == null
+ || pulsarConfig.getStatIntervalSec() <= 0) {
+ return;
+ }
+ // get statistic items
+ String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+ String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+ String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+ int intMsgCnt = Integer.parseInt(
+ event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+ long dataTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ String orderType = isOrder ? "order" : "non-order";
+ StringBuilder newBase = new StringBuilder(512)
+ .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
+ .append(streamId).append(SEPARATOR).append(nodeIp)
+ .append(SEPARATOR).append(NetworkUtils.getLocalIp())
+ .append(SEPARATOR).append(orderType).append(SEPARATOR)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+ long messageSize = event.getBody().length;
+ if (isSuccess) {
+ monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0);
+ } else {
+ monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt);
+ if (logPrinterB.shouldPrint()) {
+ logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
}
}
}
@@ -550,7 +512,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
metricItem.sendCount.incrementAndGet();
metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length);
monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
- editStatistic(eventStat.getEvent(), null, eventStat.isOrderMessage());
+ editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage());
}
@@ -572,7 +534,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
logger.error("send failed for " + getName(), e);
}
if (eventStat.getRetryCnt() == 0) {
- editStatistic(eventStat.getEvent(), "failure", eventStat.isOrderMessage());
+ editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage());
}
}
Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 42e90ed0f..f1f2b6bbb 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -516,8 +516,8 @@ public class TubeSink extends AbstractSink implements Configurable {
long dataTimeL = Long.parseLong(
event.getHeaders().get(AttributeConstants.DATA_TIME));
// build statistic key
- StringBuilder newBase = new StringBuilder(512);
- newBase.append(getName()).append(SEP_HASHTAG).append(topic)
+ StringBuilder newBase = new StringBuilder(512)
+ .append(getName()).append(SEP_HASHTAG).append(topic)
.append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
.append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
.append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 1bdcfaf11..7ef6228f9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -18,11 +18,17 @@
package org.apache.inlong.dataproxy.sink.pulsar;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
-import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
@@ -43,15 +49,6 @@ import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
public class PulsarClientService {
private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
@@ -82,8 +79,6 @@ public class PulsarClientService {
private int maxBatchingMessages = 1000;
private long maxBatchingPublishDelayMillis = 1;
private long retryIntervalWhenSendMsgError = 30 * 1000L;
- private String localIp = "127.0.0.1";
-
private int sinkThreadPoolSize;
/**
@@ -120,7 +115,6 @@ public class PulsarClientService {
maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
producerInfoMap = new ConcurrentHashMap<>();
topicSendIndexMap = new ConcurrentHashMap<>();
- localIp = NetworkUtils.getLocalIp();
}
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -141,11 +135,15 @@ public class PulsarClientService {
* send message
*/
public boolean sendMessage(int poolIndex, String topic, Event event,
- SendMessageCallBack sendMessageCallBack, EventStat es) {
+ SendMessageCallBack sendMessageCallBack, EventStat es) {
TopicProducerInfo producerInfo = null;
boolean result;
- final String inlongStreamId = getInlongStreamId(event);
- final String inlongGroupId = getInlongGroupId(event);
+ final String pkgVersion =
+ event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
+ final String inlongStreamId =
+ event.getHeaders().get(AttributeConstants.GROUP_ID);
+ final String inlongGroupId =
+ event.getHeaders().get(AttributeConstants.STREAM_ID);
try {
producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
} catch (Exception e) {
@@ -166,11 +164,6 @@ public class PulsarClientService {
sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null"));
return true;
}
-
- Map<String, String> proMap = new HashMap<>();
- proMap.put("data_proxy_ip", localIp);
- proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
TopicProducerInfo forCallBackP = producerInfo;
Producer producer = producerInfo.getProducer(poolIndex);
if (producer == null) {
@@ -179,6 +172,9 @@ public class PulsarClientService {
sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null"));
return true;
}
+ // build and send message
+ Map<String, String> proMap =
+ MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
if (es.isOrderMessage()) {
String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index 2b44a2d88..efbaedfb6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -41,8 +41,6 @@ public class SinkTask extends Thread {
private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000);
- private static String TOPIC = "topic";
-
/*
* default value
*/
@@ -150,70 +148,56 @@ public class SinkTask extends Thread {
sinkCounter.incrementEventDrainAttemptCount();
event = eventStat.getEvent();
}
-
- /*
- * get topic
- */
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
+ // check event status
+ if (event == null) {
+ logger.warn("Event is null!");
+ continue;
}
+ // get topic
+ topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
if (StringUtils.isEmpty(topic)) {
String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID);
String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId);
}
-
- if (event == null) {
- logger.warn("Event is null!");
- continue;
- }
-
if (topic == null || topic.equals("")) {
- pulsarSink.handleMessageSendException(topic, eventStat, new Exception("topic"
- + " info is null"));
+ pulsarSink.handleMessageSendException(topic, eventStat,
+ new Exception(ConfigConstants.TOPIC_KEY + " info is null"));
processToReTrySend(eventStat);
logger.warn("no topic specified, so will retry send!");
continue;
}
-
+ // check whether order-type message
if (eventStat.isOrderMessage()) {
sleep(1000);
}
-
+ // check whether discard or send event
if (eventStat.getRetryCnt() > maxRetrySendCnt) {
logger.warn("Message will be discard! send times reach to max retry cnt."
+ " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt);
continue;
}
-
+ // check whether duplicated event
String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
-
- boolean hasSend = false;
if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
- hasSend = agentIdCache.asMap().containsKey(clientSeqId);
- }
-
- if (pulsarConfig.getClientIdCache() && clientSeqId != null && hasSend) {
+ boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId);
agentIdCache.put(clientSeqId, System.currentTimeMillis());
- if (logPrinterA.shouldPrint()) {
- logger.info("{} agent package {} existed,just discard.",
- getName(), clientSeqId);
- }
- } else {
- if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
- agentIdCache.put(clientSeqId, System.currentTimeMillis());
- }
- boolean sendResult = pulsarClientService.sendMessage(poolIndex, topic,
- event, pulsarSink, eventStat);
- if (!sendResult) {
- /*
- * only for order message
- */
- processToReTrySend(eventStat);
+ if (hasSend) {
+ if (logPrinterA.shouldPrint()) {
+ logger.info("{} agent package {} existed,just discard.",
+ getName(), clientSeqId);
+ }
+ continue;
}
- currentInFlightCount.incrementAndGet();
- decrementFlag = true;
}
+ // send message
+ if (!pulsarClientService.sendMessage(
+ poolIndex, topic, event, pulsarSink, eventStat)) {
+ // only for order message
+ processToReTrySend(eventStat);
+ }
+ currentInFlightCount.incrementAndGet();
+ decrementFlag = true;
} catch (InterruptedException e) {
logger.error("Thread {} has been interrupted!",
Thread.currentThread().getName());