You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:21:12 UTC

[inlong] branch release-1.3.0 updated (bd30f354f -> 668ff36c5)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from bd30f354f [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
     new 0edca6135 [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
     new 668ff36c5 [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 106 +++++++--------------
 .../org/apache/inlong/dataproxy/sink/TubeSink.java |   6 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |  37 ++++---
 .../inlong/dataproxy/sink/pulsar/SinkTask.java     |  68 +++++--------
 4 files changed, 81 insertions(+), 136 deletions(-)


[inlong] 01/02: [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 0edca61351e51b4926d6884a6eafe696155efc82
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 26 19:17:54 2022 +0800

    [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
---
 .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java        | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 713cdfd72..a27a00758 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -19,6 +19,8 @@ package org.apache.inlong.dataproxy.sink;
 
 import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
+
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;


[inlong] 02/02: [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 668ff36c5ff6e65a1dbdcc58060dee6c9d9a8370
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Sep 17 19:35:15 2022 +0800

    [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923)
---
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 106 +++++++--------------
 .../org/apache/inlong/dataproxy/sink/TubeSink.java |   4 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |  37 ++++---
 .../inlong/dataproxy/sink/pulsar/SinkTask.java     |  68 +++++--------
 4 files changed, 79 insertions(+), 136 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 8127a4305..94b5ed2cc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -51,6 +51,7 @@ import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
 import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
 import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
 import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
@@ -122,10 +123,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                     return System.currentTimeMillis();
                 }
             });
-    /*
-     * properties for header info
-     */
-    private static final String TOPIC = "topic";
     /*
      * for stat
      */
@@ -407,14 +404,16 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                             + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
                     tx.rollback();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 } else {
                     tx.commit();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
                     DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readSuccessCount.incrementAndGet();
                     metricItem.readSuccessSize.addAndGet(event.getBody().length);
@@ -437,70 +436,33 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         return status;
     }
 
-    private void editStatistic(final Event event, String keyPostfix, boolean isOrder) {
-        String topic = "";
-        String streamId = "";
-        String nodeIp;
-        if (event != null) {
-            if (event.getHeaders().containsKey(TOPIC)) {
-                topic = event.getHeaders().get(TOPIC);
-            }
-            if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-                streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-            } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                streamId = event.getHeaders().get(AttributeConstants.INAME);
-            }
-
-            // Compatible agent
-            if (event.getHeaders().containsKey("ip")) {
-                event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip"));
-                event.getHeaders().remove("ip");
-            }
-
-            // Compatible agent
-            if (event.getHeaders().containsKey("time")) {
-                event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time"));
-                event.getHeaders().remove("time");
-            }
-
-            if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) {
-                nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-                if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
-                    if (nodeIp != null) {
-                        nodeIp = nodeIp.split(":")[0];
-                    }
-
-                    long msgCounterL = 1L;
-                    // msg counter
-                    if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
-                        msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-                    }
-
-                    String orderType = "non-order";
-                    if (isOrder) {
-                        orderType = "order";
-                    }
-                    StringBuilder newBase = new StringBuilder();
-                    newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
-                            .append(streamId).append(SEPARATOR).append(nodeIp)
-                            .append(SEPARATOR).append(NetworkUtils.getLocalIp())
-                            .append(SEPARATOR).append(orderType).append(SEPARATOR)
-                            .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
-                    long messageSize = event.getBody().length;
-                    if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) {
-                        messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
-                    }
-
-                    if (keyPostfix != null && !keyPostfix.equals("")) {
-                        monitorIndex.addAndGet(new String(newBase), 0, 0, 0, (int) msgCounterL);
-                        if (logPrinterB.shouldPrint()) {
-                            logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
-                        }
-                    } else {
-                        monitorIndex.addAndGet(new String(newBase), (int) msgCounterL, 1, messageSize, 0);
-                    }
-                }
+    private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) {
+        if (event == null
+                || pulsarConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // get statistic items
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+        int intMsgCnt = Integer.parseInt(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+        long dataTimeL = Long.parseLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        String orderType = isOrder ? "order" : "non-order";
+        StringBuilder newBase = new StringBuilder(512)
+                .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
+                .append(streamId).append(SEPARATOR).append(nodeIp)
+                .append(SEPARATOR).append(NetworkUtils.getLocalIp())
+                .append(SEPARATOR).append(orderType).append(SEPARATOR)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        long messageSize = event.getBody().length;
+        if (isSuccess) {
+            monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0);
+        } else {
+            monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt);
+            if (logPrinterB.shouldPrint()) {
+                logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
             }
         }
     }
@@ -549,7 +511,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         metricItem.sendCount.incrementAndGet();
         metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length);
         monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
-        editStatistic(eventStat.getEvent(), null, eventStat.isOrderMessage());
+        editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage());
 
     }
 
@@ -571,7 +533,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                 logger.error("send failed for " + getName(), e);
             }
             if (eventStat.getRetryCnt() == 0) {
-                editStatistic(eventStat.getEvent(), "failure", eventStat.isOrderMessage());
+                editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage());
             }
         }
         Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index a27a00758..e3fcc1155 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -516,8 +516,8 @@ public class TubeSink extends AbstractSink implements Configurable {
             long dataTimeL = Long.parseLong(
                     event.getHeaders().get(AttributeConstants.DATA_TIME));
             // build statistic key
-            StringBuilder newBase = new StringBuilder(512);
-            newBase.append(getName()).append(SEP_HASHTAG).append(topic)
+            StringBuilder newBase = new StringBuilder(512)
+                    .append(getName()).append(SEP_HASHTAG).append(topic)
                     .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
                     .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
                     .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 28af3fa1c..47cdff3fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -18,6 +18,13 @@
 package org.apache.inlong.dataproxy.sink.pulsar;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
@@ -43,15 +50,6 @@ import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class PulsarClientService {
 
     private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
@@ -82,8 +80,6 @@ public class PulsarClientService {
     private int maxBatchingMessages = 1000;
     private long maxBatchingPublishDelayMillis = 1;
     private long retryIntervalWhenSendMsgError = 30 * 1000L;
-    private String localIp = "127.0.0.1";
-
     private int sinkThreadPoolSize;
 
     /**
@@ -120,7 +116,6 @@ public class PulsarClientService {
         maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
         producerInfoMap = new ConcurrentHashMap<>();
         topicSendIndexMap = new ConcurrentHashMap<>();
-        localIp = NetworkUtils.getLocalIp();
     }
 
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -141,11 +136,15 @@ public class PulsarClientService {
      * send message
      */
     public boolean sendMessage(int poolIndex, String topic, Event event,
-            SendMessageCallBack sendMessageCallBack, EventStat es) {
+                               SendMessageCallBack sendMessageCallBack, EventStat es) {
         TopicProducerInfo producerInfo = null;
         boolean result;
-        final String inlongStreamId = getInlongStreamId(event);
-        final String inlongGroupId = getInlongGroupId(event);
+        final String pkgVersion =
+                event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
+        final String inlongStreamId =
+                event.getHeaders().get(AttributeConstants.GROUP_ID);
+        final String inlongGroupId =
+                event.getHeaders().get(AttributeConstants.STREAM_ID);
         try {
             producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
         } catch (Exception e) {
@@ -166,11 +165,6 @@ public class PulsarClientService {
             sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null"));
             return true;
         }
-
-        Map<String, String> proMap = new HashMap<>();
-        proMap.put("data_proxy_ip", localIp);
-        proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
         TopicProducerInfo forCallBackP = producerInfo;
         Producer producer = producerInfo.getProducer(poolIndex);
         if (producer == null) {
@@ -179,6 +173,9 @@ public class PulsarClientService {
             sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null"));
             return true;
         }
+        // build and send message
+        Map<String, String> proMap =
+                MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
         if (es.isOrderMessage()) {
             String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
             try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index 2b44a2d88..efbaedfb6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -41,8 +41,6 @@ public class SinkTask extends Thread {
 
     private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000);
 
-    private static String TOPIC = "topic";
-
     /*
      * default value
      */
@@ -150,70 +148,56 @@ public class SinkTask extends Thread {
                     sinkCounter.incrementEventDrainAttemptCount();
                     event = eventStat.getEvent();
                 }
-
-                /*
-                 * get topic
-                 */
-                if (event.getHeaders().containsKey(TOPIC)) {
-                    topic = event.getHeaders().get(TOPIC);
+                // check event status
+                if (event == null) {
+                    logger.warn("Event is null!");
+                    continue;
                 }
+                // get topic
+                topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
                 if (StringUtils.isEmpty(topic)) {
                     String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID);
                     String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
                     topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId);
                 }
-
-                if (event == null) {
-                    logger.warn("Event is null!");
-                    continue;
-                }
-
                 if (topic == null || topic.equals("")) {
-                    pulsarSink.handleMessageSendException(topic, eventStat, new Exception("topic"
-                            + " info is null"));
+                    pulsarSink.handleMessageSendException(topic, eventStat,
+                            new Exception(ConfigConstants.TOPIC_KEY + " info is null"));
                     processToReTrySend(eventStat);
                     logger.warn("no topic specified, so will retry send!");
                     continue;
                 }
-
+                // check whether order-type message
                 if (eventStat.isOrderMessage()) {
                     sleep(1000);
                 }
-
+                // check whether discard or send event
                 if (eventStat.getRetryCnt() > maxRetrySendCnt) {
                     logger.warn("Message will be discard! send times reach to max retry cnt."
                             + " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt);
                     continue;
                 }
-
+                // check whether duplicated event
                 String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
-
-                boolean hasSend = false;
                 if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
-                    hasSend = agentIdCache.asMap().containsKey(clientSeqId);
-                }
-
-                if (pulsarConfig.getClientIdCache() && clientSeqId != null && hasSend) {
+                    boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId);
                     agentIdCache.put(clientSeqId, System.currentTimeMillis());
-                    if (logPrinterA.shouldPrint()) {
-                        logger.info("{} agent package {} existed,just discard.",
-                                getName(), clientSeqId);
-                    }
-                } else {
-                    if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
-                        agentIdCache.put(clientSeqId, System.currentTimeMillis());
-                    }
-                    boolean sendResult = pulsarClientService.sendMessage(poolIndex, topic,
-                            event, pulsarSink, eventStat);
-                    if (!sendResult) {
-                        /*
-                         * only for order message
-                         */
-                        processToReTrySend(eventStat);
+                    if (hasSend) {
+                        if (logPrinterA.shouldPrint()) {
+                            logger.info("{} agent package {} existed,just discard.",
+                                    getName(), clientSeqId);
+                        }
+                        continue;
                     }
-                    currentInFlightCount.incrementAndGet();
-                    decrementFlag = true;
                 }
+                // send message
+                if (!pulsarClientService.sendMessage(
+                        poolIndex, topic, event, pulsarSink, eventStat)) {
+                    // only for order message
+                    processToReTrySend(eventStat);
+                }
+                currentInFlightCount.incrementAndGet();
+                decrementFlag = true;
             } catch (InterruptedException e) {
                 logger.error("Thread {} has been interrupted!",
                         Thread.currentThread().getName());