You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/30 02:07:48 UTC

[inlong] 01/04: [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5c3b457d54c7a38301b685f238e5de0c7f2b3ca2
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Aug 29 14:08:10 2022 +0800

    [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)
---
 .../dataproxy/source/ServerMessageHandler.java     | 44 ++++++++++++----------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8c158d3ce..d79143260 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,28 +17,14 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
@@ -63,8 +49,21 @@ import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
 /**
  * Server message handler
@@ -400,6 +399,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
             inLongMsgVer = 4;
         }
+        int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
@@ -456,6 +456,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                 // every message share the same msg cnt? what if msgType = 5
                 String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+                    commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
+                    proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                }
                 headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
 
                 byte[] data = inLongMsg.buildArray();
@@ -510,7 +514,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(new String(newbase), 0,0,0,
+                    monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
                             Integer.parseInt(proxyMetricMsgCnt));
                     this.addMetric(false, data.length, event);
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
@@ -731,7 +735,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
     /**
      * addMetric
-     * 
+     *
      * @param result
      * @param size
      * @param event