You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:41:36 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 713c99adc [INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)
713c99adc is described below
commit 713c99adce757de0b8477cc8fc8cf41af45afe31
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 20 12:57:03 2022 +0800
[INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)
---
.../inlong/dataproxy/source/ServerMessageHandler.java | 14 +++-----------
1 file changed, 3 insertions(+), 11 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 4ec9d5691..446fd9b86 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -362,14 +362,12 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
// build InLongMsg
String groupId = null;
- int streamMsgCnt = 0;
InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
for (ProxyMessage message : streamIdEntry.getValue()) {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
@@ -378,7 +376,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
inLongMsg.addMsg(message.getData());
}
} else {
@@ -386,15 +383,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- if (recordMsgCnt != streamMsgCnt) {
- logger.debug("Found message count not equal, record={}, calculate value = {}",
- recordMsgCnt, streamMsgCnt);
- }
- commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(streamMsgCnt));
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
// build headers
Map<String, String> headers = new HashMap<>();
headers.put(AttributeConstants.GROUP_ID, groupId);
@@ -456,12 +448,12 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
this.addStatistics(true, data.length, event);
monitorIndex.addAndGet(strBuff.toString(),
- streamMsgCnt, 1, data.length, 0);
+ recordMsgCnt, 1, data.length, 0);
strBuff.delete(0, strBuff.length());
} catch (Throwable ex) {
logger.error("Error writting to channel,data will discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
- monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
+ monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, recordMsgCnt);
this.addStatistics(false, data.length, event);
strBuff.delete(0, strBuff.length());
throw new ChannelException("ProcessEvent error can't write event to channel.");