You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:41:36 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 713c99adc [INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)
713c99adc is described below

commit 713c99adce757de0b8477cc8fc8cf41af45afe31
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 20 12:57:03 2022 +0800

    [INLONG-5941][DataProxy] Store the initial number of message items in Map (#5942)
---
 .../inlong/dataproxy/source/ServerMessageHandler.java      | 14 +++-----------
 1 file changed, 3 insertions(+), 11 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 4ec9d5691..446fd9b86 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -362,14 +362,12 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
                 // build InLongMsg
                 String groupId = null;
-                int streamMsgCnt = 0;
                 InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
                 if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                     for (ProxyMessage message : streamIdEntry.getValue()) {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
@@ -378,7 +376,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         inLongMsg.addMsg(message.getData());
                     }
                 } else {
@@ -386,15 +383,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-                if (recordMsgCnt != streamMsgCnt) {
-                    logger.debug("Found message count not equal, record={}, calculate value = {}",
-                            recordMsgCnt, streamMsgCnt);
-                }
-                commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(streamMsgCnt));
+                commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
                 // build headers
                 Map<String, String> headers = new HashMap<>();
                 headers.put(AttributeConstants.GROUP_ID, groupId);
@@ -456,12 +448,12 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                     this.addStatistics(true, data.length, event);
                     monitorIndex.addAndGet(strBuff.toString(),
-                            streamMsgCnt, 1, data.length, 0);
+                            recordMsgCnt, 1, data.length, 0);
                     strBuff.delete(0, strBuff.length());
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
+                    monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, recordMsgCnt);
                     this.addStatistics(false, data.length, event);
                     strBuff.delete(0, strBuff.length());
                     throw new ChannelException("ProcessEvent error can't write event to channel.");