You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:21:14 UTC

[inlong] 02/02: [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 668ff36c5ff6e65a1dbdcc58060dee6c9d9a8370
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Sep 17 19:35:15 2022 +0800

    [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)
---
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 106 +++++++--------------
 .../org/apache/inlong/dataproxy/sink/TubeSink.java |   4 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |  37 ++++---
 .../inlong/dataproxy/sink/pulsar/SinkTask.java     |  68 +++++--------
 4 files changed, 79 insertions(+), 136 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 8127a4305..94b5ed2cc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -51,6 +51,7 @@ import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
 import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
 import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
 import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
@@ -122,10 +123,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                     return System.currentTimeMillis();
                 }
             });
-    /*
-     * properties for header info
-     */
-    private static final String TOPIC = "topic";
     /*
      * for stat
      */
@@ -407,14 +404,16 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                             + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
                     tx.rollback();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 } else {
                     tx.commit();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readSuccessCount.incrementAndGet();
                     metricItem.readSuccessSize.addAndGet(event.getBody().length);
@@ -437,70 +436,33 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         return status;
     }
 
-    private void editStatistic(final Event event, String keyPostfix, boolean isOrder) {
-        String topic = "";
-        String streamId = "";
-        String nodeIp;
-        if (event != null) {
-            if (event.getHeaders().containsKey(TOPIC)) {
-                topic = event.getHeaders().get(TOPIC);
-            }
-            if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-                streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-            } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                streamId = event.getHeaders().get(AttributeConstants.INAME);
-            }
-
-            // Compatible agent
-            if (event.getHeaders().containsKey("ip")) {
-                event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip"));
-                event.getHeaders().remove("ip");
-            }
-
-            // Compatible agent
-            if (event.getHeaders().containsKey("time")) {
-                event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time"));
-                event.getHeaders().remove("time");
-            }
-
-            if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) {
-                nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-                if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
-                    if (nodeIp != null) {
-                        nodeIp = nodeIp.split(":")[0];
-                    }
-
-                    long msgCounterL = 1L;
-                    // msg counter
-                    if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
-                        msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-                    }
-
-                    String orderType = "non-order";
-                    if (isOrder) {
-                        orderType = "order";
-                    }
-                    StringBuilder newBase = new StringBuilder();
-                    newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
-                            .append(streamId).append(SEPARATOR).append(nodeIp)
-                            .append(SEPARATOR).append(NetworkUtils.getLocalIp())
-                            .append(SEPARATOR).append(orderType).append(SEPARATOR)
-                            .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
-                    long messageSize = event.getBody().length;
-                    if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) {
-                        messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
-                    }
-
-                    if (keyPostfix != null && !keyPostfix.equals("")) {
-                        monitorIndex.addAndGet(new String(newBase), 0, 0, 0, (int) msgCounterL);
-                        if (logPrinterB.shouldPrint()) {
-                            logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
-                        }
-                    } else {
-                        monitorIndex.addAndGet(new String(newBase), (int) msgCounterL, 1, messageSize, 0);
-                    }
-                }
+    private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) {
+        if (event == null
+                || pulsarConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // get statistic items
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+        int intMsgCnt = Integer.parseInt(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+        long dataTimeL = Long.parseLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        String orderType = isOrder ? "order" : "non-order";
+        StringBuilder newBase = new StringBuilder(512)
+                .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
+                .append(streamId).append(SEPARATOR).append(nodeIp)
+                .append(SEPARATOR).append(NetworkUtils.getLocalIp())
+                .append(SEPARATOR).append(orderType).append(SEPARATOR)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        long messageSize = event.getBody().length;
+        if (isSuccess) {
+            monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0);
+        } else {
+            monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt);
+            if (logPrinterB.shouldPrint()) {
+                logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
             }
         }
     }
@@ -549,7 +511,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         metricItem.sendCount.incrementAndGet();
         metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length);
         monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
-        editStatistic(eventStat.getEvent(), null, eventStat.isOrderMessage());
+        editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage());
 
     }
 
@@ -571,7 +533,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                 logger.error("send failed for " + getName(), e);
             }
             if (eventStat.getRetryCnt() == 0) {
-                editStatistic(eventStat.getEvent(), "failure", eventStat.isOrderMessage());
+                editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage());
             }
         }
         Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index a27a00758..e3fcc1155 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -516,8 +516,8 @@ public class TubeSink extends AbstractSink implements Configurable {
             long dataTimeL = Long.parseLong(
                     event.getHeaders().get(AttributeConstants.DATA_TIME));
             // build statistic key
-            StringBuilder newBase = new StringBuilder(512);
-            newBase.append(getName()).append(SEP_HASHTAG).append(topic)
+            StringBuilder newBase = new StringBuilder(512)
+                    .append(getName()).append(SEP_HASHTAG).append(topic)
                     .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
                     .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
                     .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 28af3fa1c..47cdff3fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -18,6 +18,13 @@
 package org.apache.inlong.dataproxy.sink.pulsar;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
@@ -43,15 +50,6 @@ import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class PulsarClientService {
 
     private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
@@ -82,8 +80,6 @@ public class PulsarClientService {
     private int maxBatchingMessages = 1000;
     private long maxBatchingPublishDelayMillis = 1;
     private long retryIntervalWhenSendMsgError = 30 * 1000L;
-    private String localIp = "127.0.0.1";
-
     private int sinkThreadPoolSize;
 
     /**
@@ -120,7 +116,6 @@ public class PulsarClientService {
         maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
         producerInfoMap = new ConcurrentHashMap<>();
         topicSendIndexMap = new ConcurrentHashMap<>();
-        localIp = NetworkUtils.getLocalIp();
     }
 
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -141,11 +136,15 @@ public class PulsarClientService {
      * send message
      */
     public boolean sendMessage(int poolIndex, String topic, Event event,
-            SendMessageCallBack sendMessageCallBack, EventStat es) {
+                               SendMessageCallBack sendMessageCallBack, EventStat es) {
         TopicProducerInfo producerInfo = null;
         boolean result;
-        final String inlongStreamId = getInlongStreamId(event);
-        final String inlongGroupId = getInlongGroupId(event);
+        final String pkgVersion =
+                event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
+        final String inlongStreamId =
+                event.getHeaders().get(AttributeConstants.GROUP_ID);
+        final String inlongGroupId =
+                event.getHeaders().get(AttributeConstants.STREAM_ID);
         try {
             producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
         } catch (Exception e) {
@@ -166,11 +165,6 @@ public class PulsarClientService {
             sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null"));
             return true;
         }
-
-        Map<String, String> proMap = new HashMap<>();
-        proMap.put("data_proxy_ip", localIp);
-        proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
         TopicProducerInfo forCallBackP = producerInfo;
         Producer producer = producerInfo.getProducer(poolIndex);
         if (producer == null) {
@@ -179,6 +173,9 @@ public class PulsarClientService {
             sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null"));
             return true;
         }
+        // build and send message
+        Map<String, String> proMap =
+                MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
         if (es.isOrderMessage()) {
             String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
             try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index 2b44a2d88..efbaedfb6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -41,8 +41,6 @@ public class SinkTask extends Thread {
 
     private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000);
 
-    private static String TOPIC = "topic";
-
     /*
      * default value
      */
@@ -150,70 +148,56 @@ public class SinkTask extends Thread {
                     sinkCounter.incrementEventDrainAttemptCount();
                     event = eventStat.getEvent();
                 }
-
-                /*
-                 * get topic
-                 */
-                if (event.getHeaders().containsKey(TOPIC)) {
-                    topic = event.getHeaders().get(TOPIC);
+                // check event status
+                if (event == null) {
+                    logger.warn("Event is null!");
+                    continue;
                 }
+                // get topic
+                topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
                 if (StringUtils.isEmpty(topic)) {
                     String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID);
                     String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
                     topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId);
                 }
-
-                if (event == null) {
-                    logger.warn("Event is null!");
-                    continue;
-                }
-
                 if (topic == null || topic.equals("")) {
-                    pulsarSink.handleMessageSendException(topic, eventStat, new Exception("topic"
-                            + " info is null"));
+                    pulsarSink.handleMessageSendException(topic, eventStat,
+                            new Exception(ConfigConstants.TOPIC_KEY + " info is null"));
                     processToReTrySend(eventStat);
                     logger.warn("no topic specified, so will retry send!");
                     continue;
                 }
-
+                // check whether order-type message
                 if (eventStat.isOrderMessage()) {
                     sleep(1000);
                 }
-
+                // check whether discard or send event
                 if (eventStat.getRetryCnt() > maxRetrySendCnt) {
                     logger.warn("Message will be discard! send times reach to max retry cnt."
                             + " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt);
                     continue;
                 }
-
+                // check whether duplicated event
                 String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
-
-                boolean hasSend = false;
                 if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
-                    hasSend = agentIdCache.asMap().containsKey(clientSeqId);
-                }
-
-                if (pulsarConfig.getClientIdCache() && clientSeqId != null && hasSend) {
+                    boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId);
                     agentIdCache.put(clientSeqId, System.currentTimeMillis());
-                    if (logPrinterA.shouldPrint()) {
-                        logger.info("{} agent package {} existed,just discard.",
-                                getName(), clientSeqId);
-                    }
-                } else {
-                    if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
-                        agentIdCache.put(clientSeqId, System.currentTimeMillis());
-                    }
-                    boolean sendResult = pulsarClientService.sendMessage(poolIndex, topic,
-                            event, pulsarSink, eventStat);
-                    if (!sendResult) {
-                        /*
-                         * only for order message
-                         */
-                        processToReTrySend(eventStat);
+                    if (hasSend) {
+                        if (logPrinterA.shouldPrint()) {
+                            logger.info("{} agent package {} existed,just discard.",
+                                    getName(), clientSeqId);
+                        }
+                        continue;
                     }
-                    currentInFlightCount.incrementAndGet();
-                    decrementFlag = true;
                 }
+                // send message
+                if (!pulsarClientService.sendMessage(
+                        poolIndex, topic, event, pulsarSink, eventStat)) {
+                    // only for order message
+                    processToReTrySend(eventStat);
+                }
+                currentInFlightCount.incrementAndGet();
+                decrementFlag = true;
             } catch (InterruptedException e) {
                 logger.error("Thread {} has been interrupted!",
                         Thread.currentThread().getName());