You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/30 02:07:48 UTC
[inlong] 01/04: [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 5c3b457d54c7a38301b685f238e5de0c7f2b3ca2
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Aug 29 14:08:10 2022 +0800
[INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when msgType equals 5. (#5728)
---
.../dataproxy/source/ServerMessageHandler.java | 44 ++++++++++++----------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8c158d3ce..d79143260 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,28 +17,14 @@
package org.apache.inlong.dataproxy.source;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
@@ -63,8 +49,21 @@ import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
/**
* Server message handler
@@ -400,6 +399,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
inLongMsgVer = 4;
}
+ int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
@@ -456,6 +456,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
// every message share the same msg cnt? what if msgType = 5
String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+ if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
+ proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+ }
headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
byte[] data = inLongMsg.buildArray();
@@ -510,7 +514,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
} catch (Throwable ex) {
logger.error("Error writting to channel,data will discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
- monitorIndex.addAndGet(new String(newbase), 0,0,0,
+ monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
Integer.parseInt(proxyMetricMsgCnt));
this.addMetric(false, data.length, event);
throw new ChannelException("ProcessEvent error can't write event to channel.");
@@ -731,7 +735,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
/**
* addMetric
- *
+ *
* @param result
* @param size
* @param event