You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:04:25 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 07e429e8f [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)
07e429e8f is described below

commit 07e429e8f0d5c99d3779eedf67afc779a58bbb2a
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 16 17:13:21 2022 +0800

    [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)
---
 .../dataproxy/http/SimpleMessageHandler.java       |   1 -
 .../dataproxy/source/DefaultServiceDecoder.java    | 253 ++++++------
 .../dataproxy/source/ServerMessageHandler.java     | 436 +++++++++------------
 .../inlong/dataproxy/source/ServiceDecoder.java    |  11 +-
 .../dataproxy/source/SimpleMessageHandler.java     |   6 +-
 5 files changed, 313 insertions(+), 394 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 3f1bf425c..0689039e1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -153,7 +153,6 @@ public class SimpleMessageHandler implements MessageHandler {
         headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
         headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
         byte[] data = inLongMsg.buildArray();
-        headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         Event event = EventBuilder.withBody(data, headers);
         inLongMsg.reset();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 355dc7a2e..11ac95e4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -18,28 +18,27 @@
 package org.apache.inlong.dataproxy.source;
 
 import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.apache.inlong.dataproxy.utils.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class DefaultServiceDecoder implements ServiceDecoder {
 
     private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
@@ -109,33 +108,19 @@ public class DefaultServiceDecoder implements ServiceDecoder {
         return resultMap;
     }
 
-    private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
-            long uniq, long dataTime, int msgCount) {
+    private void handleDateTime(Map<String, String> commonAttrMap, long uniq,
+                                long dataTime, int msgCount, String strRemoteIP,
+                                long msgRcvTime) {
         commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
-        String time = "";
-        if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) {
-            time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
-        } else {
-            time = String.valueOf(dataTime);
-        }
-        StringBuilder sidBuilder = new StringBuilder();
-        /*
-         * udp need use msgEvent get remote address
-         */
-        String remoteAddress = "";
-        if (channel != null && channel.remoteAddress() != null) {
-            remoteAddress = channel.remoteAddress().toString();
-        }
-        sidBuilder.append(remoteAddress).append("#").append(time)
-                .append("#").append(uniq);
-        commonAttrMap.put(AttributeConstants.SEQUENCE_ID, new String(sidBuilder));
-
-        // datetime from sdk
+        String time = String.valueOf(dataTime);
+        commonAttrMap.put(AttributeConstants.SEQUENCE_ID,
+                new StringBuilder(256).append(strRemoteIP)
+                        .append("#").append(time).append("#").append(uniq).toString());
+        // dt from sdk
         commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(dataTime));
         commonAttrMap
-                .put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
-        commonAttrMap.put(AttributeConstants.MESSAGE_COUNT,
-                String.valueOf(msgCount != 0 ? msgCount : 1));
+                .put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
+        commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCount));
     }
 
     private boolean handleExtMap(Map<String, String> commonAttrMap, ByteBuf cb,
@@ -165,18 +150,18 @@ public class DefaultServiceDecoder implements ServiceDecoder {
     }
 
     private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int extendField,
-            int msgHeadPos, int totalDataLen, int attrLen, String strAttr, int bodyLen) {
+                                   int msgHeadPos, int totalDataLen, int attrLen,
+                                   String strAttr, int bodyLen, long msgRcvTime) {
         // whether enable trace
-        boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
         ByteBuffer dataBuf;
+        boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
         if (!enableTrace) {
             dataBuf = ByteBuffer.allocate(totalDataLen + BIN_MSG_TOTALLEN_SIZE);
             cb.getBytes(msgHeadPos, dataBuf.array(), 0,
                     totalDataLen + BIN_MSG_TOTALLEN_SIZE);
         } else {
-            String traceInfo;
+            // get local address
             String strNode2Ip = null;
-
             SocketAddress loacalSockAddr = channel.localAddress();
             if (null != loacalSockAddr) {
                 strNode2Ip = loacalSockAddr.toString();
@@ -187,11 +172,9 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                             strNode2Ip, loacalSockAddr);
                 }
             }
-
-            traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + System.currentTimeMillis();
-
+            // build trace information
             int newTotalLen = 0;
-
+            String traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + msgRcvTime;
             if (attrLen != 0) {
                 newTotalLen = totalDataLen + traceInfo.length() + "&".length();
                 strAttr = strAttr + "&" + traceInfo;
@@ -199,7 +182,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                 newTotalLen = totalDataLen + traceInfo.length();
                 strAttr = traceInfo;
             }
-
+            // build trace information bytes
             dataBuf = ByteBuffer.allocate(newTotalLen + BIN_MSG_TOTALLEN_SIZE);
             cb.getBytes(msgHeadPos, dataBuf.array(), 0,
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE
@@ -207,11 +190,9 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             dataBuf.putShort(
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE - BIN_MSG_MAGIC_SIZE),
                     (short) strAttr.length());
-
             System.arraycopy(strAttr.getBytes(StandardCharsets.UTF_8), 0, dataBuf.array(),
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_MAGIC_SIZE),
                     strAttr.length());
-
             dataBuf.putInt(0, newTotalLen);
             dataBuf.putShort(newTotalLen + BIN_MSG_TOTALLEN_SIZE - BIN_MSG_MAGIC_SIZE,
                     (short) 0xee01);
@@ -223,57 +204,51 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * extract bin data, message type is 7
      */
     private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
-            ByteBuf cb, Channel channel,
-            int totalDataLen, MsgType msgType) throws Exception {
+                                                  ByteBuf cb, Channel channel,
+                                                  int totalDataLen, MsgType msgType,
+                                                  String strRemoteIP,
+                                                  long msgRcvTime) throws Exception {
         int msgHeadPos = cb.readerIndex() - 5;
-
+        // get body length
         int bodyLen = cb.getInt(msgHeadPos + BIN_MSG_BODYLEN_OFFSET);
+        if (bodyLen == 0) {
+            throw new Exception(
+                    "Error msg,  bodyLen is empty; connection info:" + strRemoteIP);
+        }
+        // get attribute length
         int attrLen = cb.getShort(msgHeadPos + BIN_MSG_BODY_OFFSET + bodyLen);
+        // get msg magic
         int msgMagic = cb.getUnsignedShort(msgHeadPos + BIN_MSG_BODY_OFFSET
                 + bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen);
-
-        if (bodyLen == 0) {
-            throw new Exception(new Throwable("err msg,  bodyLen is empty"
-                    + ";Connection info:" + channel.toString()));
-        }
-
-        if ((totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE))
-                || (msgMagic != BIN_MSG_MAGIC)) {
-            throw new Exception(new Throwable(
-                    "err msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen="
-                            + bodyLen + ",totalDataLen=" + totalDataLen + ",attrLen=" + attrLen
+        if ((msgMagic != BIN_MSG_MAGIC)
+                || (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE))) {
+            throw new Exception(
+                    "Error msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen="
+                            + bodyLen + ",attrLen=" + attrLen + ",totalDataLen=" + totalDataLen
                             + ";magic=" + Integer.toHexString(msgMagic)
-                            + ";Connection info:" + channel.toString()));
+                            + "; connection info:" + strRemoteIP);
         }
-
+        // read data from ByteBuf
         int groupIdNum = cb.readUnsignedShort();
         int streamIdNum = cb.readUnsignedShort();
         final int extendField = cb.readUnsignedShort();
         long dataTime = cb.readUnsignedInt();
+        dataTime = dataTime * 1000;
         int msgCount = cb.readUnsignedShort();
+        msgCount = (msgCount != 0) ? msgCount : 1;
         long uniq = cb.readUnsignedInt();
-
-        dataTime = dataTime * 1000;
-        Map<String, String> commonAttrMap = new HashMap<String, String>();
         cb.skipBytes(BIN_MSG_BODYLEN_SIZE + bodyLen + BIN_MSG_ATTRLEN_SIZE);
-        resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
-
-        resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false");
-
         // read body data
         byte[] bodyData = new byte[bodyLen];
         cb.getBytes(msgHeadPos + BIN_MSG_BODY_OFFSET, bodyData, 0, bodyLen);
-        resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
-
         // read attr and write to map.
         String strAttr = null;
+        Map<String, String> commonAttrMap = new HashMap<>();
         if (attrLen != 0) {
             byte[] attrData = new byte[attrLen];
             cb.readBytes(attrData, 0, attrLen);
             strAttr = new String(attrData, StandardCharsets.UTF_8);
-            LOG.debug("strAttr = {}, length = {}", strAttr, strAttr.length());
             resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
-
             try {
                 commonAttrMap.putAll(mapSplitter.split(strAttr));
             } catch (Exception e) {
@@ -281,26 +256,23 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                 throw new MessageIDException(uniq,
                         ErrorCode.ATTR_ERROR,
                         new Throwable("[Parse Error]new six segment protocol ,attr is "
-                                + strAttr + " , channel info:" + channel.toString()));
+                                + strAttr + " , channel info:" + strRemoteIP));
             }
         }
-
+        // build attributes
+        resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
+        resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false");
+        resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
         try {
-            handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
-            final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
+            // handle common attribute information
+            handleDateTime(commonAttrMap, uniq, dataTime, msgCount, strRemoteIP, msgRcvTime);
+            final boolean isIndexMsg =
+                    handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
             ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
-                    totalDataLen, attrLen, strAttr, bodyLen);
-
-            String groupId = null;
-            String streamId = null;
-
-            if (commonAttrMap.containsKey(AttributeConstants.GROUP_ID)) {
-                groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
-            }
-            if (commonAttrMap.containsKey(AttributeConstants.STREAM_ID)) {
-                streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
-            }
-
+                    totalDataLen, attrLen, strAttr, bodyLen, msgRcvTime);
+            // Check if groupId and streamId are number-to-name
+            String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
+            String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
             if ((groupId != null) && (streamId != null)) {
                 commonAttrMap.put(AttributeConstants.NUM2NAME, "FALSE");
                 dataBuf.putShort(BIN_MSG_EXTEND_OFFSET, (short) (extendField | 0x4));
@@ -312,15 +284,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                     commonAttrMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(streamIdNum));
                 }
             }
-
-            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !index) {
-                List<ProxyMessage> msgList = new ArrayList<>(1);
-                msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, dataBuf.array()));
-                resultMap.put(ConfigConstants.MSG_LIST, msgList);
-            } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+            // build ProxyMessage
+            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                 List<ProxyMessage> msgList = new ArrayList<>(1);
-                msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
-                        (byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
+                if (isIndexMsg) {
+                    msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
+                            (byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
+                } else {
+                    msgList.add(new ProxyMessage(groupId,
+                            streamId, commonAttrMap, dataBuf.array()));
+                }
                 resultMap.put(ConfigConstants.MSG_LIST, msgList);
             }
         } catch (Exception ex) {
@@ -328,7 +301,6 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             cb.clear();
             throw new MessageIDException(uniq, ErrorCode.OTHER_ERROR, ex.getCause());
         }
-
         return resultMap;
     }
 
@@ -336,97 +308,97 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * extract bin data, message type less than 7
      */
     private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
-            ByteBuf cb, Channel channel,
-            int totalDataLen, MsgType msgType) throws Exception {
+                                                   ByteBuf cb, int totalDataLen,
+                                                   MsgType msgType, String strRemoteIP,
+                                                   long msgRcvTime) throws Exception {
         int bodyLen = cb.readInt();
         if (bodyLen == 0) {
-            throw new Exception(new Throwable("err msg,  bodyLen is empty" + ";"
-                    + "Connection info:" + channel.toString()));
+            throw new Exception("Error msg: bodyLen is empty, connection info:" + strRemoteIP);
         }
         // if body len is bigger than totalDataLen - 5(bodyLen bytes + message type bytes),
         // that means an invalid message, reject it.
         if (bodyLen > totalDataLen - 5) {
-            throw new Exception(new Throwable("err msg, firstLen > totalDataLen, and bodyLen="
+            throw new Exception("Error msg, firstLen > totalDataLen, and bodyLen="
                     + bodyLen + ",totalDataLen=" + totalDataLen
-                    + ";Connection info:" + channel.toString()));
+                    + ", connection info:" + strRemoteIP);
         }
-
         // extract body bytes
         byte[] bodyData = new byte[bodyLen];
         cb.readBytes(bodyData, 0, bodyLen);
         resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
-
+        // extract attribute
         int attrLen = cb.readInt();
         // 9 means bodyLen bytes(4) + message type bytes(1) + attrLen bytes(4)
         if (totalDataLen != 9 + attrLen + bodyLen) {
-            throw new Exception(new Throwable(
-                    "err msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen
+            throw new Exception(
+                    "Error msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen
                             + ",totalDataLen=" + totalDataLen + ",attrDataLen=" + attrLen
-                            + ";Connection info:" + channel.toString()));
+                            + ", connection info:" + strRemoteIP);
         }
-
         // extract attr bytes
         byte[] attrData = new byte[attrLen];
         cb.readBytes(attrData, 0, attrLen);
-        String strAttr = new String(attrData, StandardCharsets.UTF_8);
-        resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
-
         // convert attr bytes to map
         Map<String, String> commonAttrMap;
+        String strAttr = new String(attrData, StandardCharsets.UTF_8);
         try {
             commonAttrMap = new HashMap<>(mapSplitter.split(strAttr));
         } catch (Exception e) {
-            throw new Exception(new Throwable("Parse commonAttrMap error.commonAttrString is: "
-                    + strAttr + " ,channel is :" + channel.toString()));
+            throw new Exception("Parse commonAttrMap error.commonAttrString is: "
+                    + strAttr + " , connection info:" + strRemoteIP);
         }
+        resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
         resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
-
         // decompress body data if compress type exists.
         String compressType = commonAttrMap.get(AttributeConstants.COMPRESS_TYPE);
-        resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType);
         if (StringUtils.isNotBlank(compressType)) {
+            resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType);
             byte[] unCompressedData = processUnCompress(bodyData, compressType);
             if (unCompressedData == null || unCompressedData.length == 0) {
-                throw new Exception(new Throwable("Uncompressed data error!compress type:"
-                        + compressType + ";data:" + new String(bodyData, StandardCharsets.UTF_8)
-                        + ";attr:" + strAttr + ";channel:" + channel.toString()));
+                throw new Exception("Uncompressed data error! compress type:"
+                        + compressType + ";attr:" + strAttr
+                        + " , connection info:" + strRemoteIP);
             }
             bodyData = unCompressedData;
         }
-
         // fill up attr map with some keys.
-        commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
         String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
         String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
-
-        // add message count attr
-        String cntStr = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
-        int msgCnt = cntStr != null ? Integer.parseInt(cntStr) : 1;
-        commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCnt));
-
+        String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
+        long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
+        commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(longDataTime));
+        commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
+        // check message count attr
+        String strMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+        int intMsgCnt = NumberUtils.toInt(strMsgCnt, 1);
+        commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(intMsgCnt));
         // extract data from bodyData and if message type is 5, convert data into list.
+        int calCnt = 0;
         List<ProxyMessage> msgList = null;
         ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
         if (MsgType.MSG_MULTI_BODY.equals(msgType)) {
-            msgList = new ArrayList<>(msgCnt);
+            msgList = new ArrayList<>(intMsgCnt);
             while (bodyBuffer.remaining() > 0) {
                 int singleMsgLen = bodyBuffer.getInt();
                 if (singleMsgLen <= 0 || singleMsgLen > bodyBuffer.remaining()) {
-                    throw new Exception(new Throwable("[Malformed Data]Invalid data len!channel is "
-                            + channel.toString()));
+                    throw new Exception(
+                            "[Malformed Data]Invalid data len! channel is " + strRemoteIP);
                 }
                 byte[] record = new byte[singleMsgLen];
                 bodyBuffer.get(record);
-
                 ProxyMessage message = new ProxyMessage(groupId, streamId, commonAttrMap, record);
                 msgList.add(message);
+                calCnt++;
             }
         } else {
             msgList = new ArrayList<>(1);
             msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, bodyData));
+            calCnt++;
+        }
+        if (calCnt != intMsgCnt) {
+            commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(calCnt));
         }
         resultMap.put(ConfigConstants.MSG_LIST, msgList);
-
         return resultMap;
     }
 
@@ -452,7 +424,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * +--------+--------+--------+----------------+--------+----------------+------------------------+
      */
     @Override
-    public Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception {
+    public Map<String, Object> extractData(ByteBuf cb, String strRemoteIP,
+                                           long msgRcvTime, Channel channel) throws Exception {
         Map<String, Object> resultMap = new HashMap<>();
         if (null == cb) {
             LOG.error("cb == null");
@@ -460,8 +433,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
         }
         int totalLen = cb.readableBytes();
         if (ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen) {
-            throw new Exception(new Throwable("err msg, ConfigConstants.MSG_MAX_LENGTH_BYTES "
-                    + "< totalLen, and  totalLen=" + totalLen));
+            throw new Exception("Error msg, ConfigConstants.MSG_MAX_LENGTH_BYTES "
+                    + "< totalLen, and  totalLen=" + totalLen);
         }
         // save index, reset it if buffer is not satisfied.
         cb.markReaderIndex();
@@ -481,14 +454,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
                 return extractNewBinHB(resultMap, cb, channel, totalDataLen);
             }
-
+            // process data message
             if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
                 resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
-                return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
+                return extractNewBinData(resultMap, cb,
+                        channel, totalDataLen, msgType,
+                        strRemoteIP, msgRcvTime);
             } else {
-                return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
+                return extractDefaultData(resultMap, cb,
+                        totalDataLen, msgType, strRemoteIP, msgRcvTime);
             }
-
         } else {
             // reset index.
             cb.resetReaderIndex();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 9d3b32c4e..ab9055ad6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,8 +17,20 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
@@ -26,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
@@ -40,37 +51,16 @@ import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
 /**
  * Server message handler
  *
@@ -91,10 +81,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final DateTimeFormatter DATE_FORMATTER
-            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
-    private static final ZoneId defZoneId = ZoneId.systemDefault();
-
     private AbstractSource source;
 
     private final ChannelGroup allChannels;
@@ -180,8 +166,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 strRemoteIp = strRemoteIp.substring(1, strRemoteIp.indexOf(':'));
             } catch (Exception ee) {
                 logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}",
-                        strRemoteIp,
-                        remoteSocketAddress);
+                        strRemoteIp, remoteSocketAddress);
             }
         }
         return strRemoteIp;
@@ -269,10 +254,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
-            Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+                                  Map<String, String> attrMap, AtomicReference<String> topicInfo) {
         String groupId = message.getGroupId();
         String streamId = message.getStreamId();
         if (null != groupId) {
+            // get configured group Id
             String from = commonAttrMap.get(AttributeConstants.FROM);
             if ("dc".equals(from)) {
                 String dcInterfaceId = message.getStreamId();
@@ -284,14 +270,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     message.setGroupId(groupId);
                 }
             }
-
-            String value = MessageUtils.getTopic(configManager.getTopicProperties(), groupId,
-                    streamId);
-            if (StringUtils.isNotEmpty(value)) {
-                topicInfo.set(value.trim());
+            // get configured topic name
+            String configTopic = MessageUtils.getTopic(
+                    configManager.getTopicProperties(), groupId, streamId);
+            if (StringUtils.isNotEmpty(configTopic)) {
+                topicInfo.set(configTopic.trim());
             }
-
-            Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
+            // get configured m value
+            Map<String, String> mxValue =
+                    configManager.getMxPropertiesMaps().get(groupId);
             if (mxValue != null && mxValue.size() != 0) {
                 message.getAttributeMap().putAll(mxValue);
             } else {
@@ -301,7 +288,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
             String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
             String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
-
+            // get configured groupId and steamId by numbers
             if (configManager.getGroupIdMappingProperties() != null
                     && configManager.getStreamIdMappingProperties() != null) {
                 groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
@@ -311,21 +298,21 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     String enableTrans =
                             (configManager.getGroupIdEnableMappingProperties() == null)
                                     ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
-                    if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
-                            .equalsIgnoreCase(num2name))) {
+                    if (("TRUE".equalsIgnoreCase(enableTrans)
+                            && "TRUE".equalsIgnoreCase(num2name))) {
                         String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), extraAttr));
                     }
-
+                    // reset groupId and streamId to message and attrMap
                     attrMap.put(AttributeConstants.GROUP_ID, groupId);
                     attrMap.put(AttributeConstants.STREAM_ID, streamId);
                     message.setGroupId(groupId);
                     message.setStreamId(streamId);
-
-                    String value = MessageUtils.getTopic(configManager.getTopicProperties(),
-                            groupId, streamId);
-                    if (StringUtils.isNotEmpty(value)) {
-                        topicInfo.set(value.trim());
+                    // get configured topic name
+                    String configTopic = MessageUtils.getTopic(
+                            configManager.getTopicProperties(), groupId, streamId);
+                    if (StringUtils.isNotEmpty(configTopic)) {
+                        topicInfo.set(configTopic.trim());
                     }
                 }
             }
@@ -333,60 +320,28 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     private boolean updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-            String strRemoteIP, MsgType msgType) {
+                                  Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+                                  String strRemoteIP) {
         for (ProxyMessage message : msgList) {
-            Map<String, String> attrMap = message.getAttributeMap();
-
             String topic = this.defaultTopic;
-
+            Map<String, String> attrMap = message.getAttributeMap();
             AtomicReference<String> topicInfo = new AtomicReference<>(topic);
             checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
             String groupId = message.getGroupId();
             String streamId = message.getStreamId();
+            if (streamId == null) {
+                streamId = "";
+                message.setStreamId(streamId);
+            }
             topic = topicInfo.get();
             if (StringUtils.isEmpty(topic)) {
                 logger.warn("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
                         groupId, streamId);
             }
-            //                if(groupId==null)groupId="b_test";//default groupId
-
+            // append topic
             message.setTopic(topic);
             commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
-
-            // whether sla
-            if (SLA_METRIC_GROUPID.equals(groupId)) {
-                commonAttrMap.put(SLA_METRIC_DATA, "true");
-                message.setTopic(SLA_METRIC_DATA);
-            }
-
-            if (groupId != null && streamId != null) {
-                String tubeSwtichKey = groupId + SEPARATOR + streamId;
-                if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
-                        && "false".equals(configManager.getTubeSwitchProperties()
-                        .get(tubeSwtichKey).trim())) {
-                    continue;
-                }
-            }
-
-            if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
-                    && !MsgType.MSG_MULTI_BODY.equals(msgType)
-                    && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
-                byte[] data = message.getData();
-                if (data[data.length - 1] == '\n') {
-                    int tripDataLen = data.length - 1;
-                    if (data[data.length - 2] == '\r') {
-                        tripDataLen = data.length - 2;
-                    }
-                    byte[] tripData = new byte[tripDataLen];
-                    System.arraycopy(data, 0, tripData, 0, tripDataLen);
-                    message.setData(tripData);
-                }
-            }
-
-            if (streamId == null) {
-                streamId = "";
-            }
+            // add ProxyMessage
             HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
                     .computeIfAbsent(topic, k -> new HashMap<>());
             List<ProxyMessage> streamIdMsgList = streamIdMsgMap
@@ -397,8 +352,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-            String strRemoteIP, MsgType msgType) throws MessageIDException {
+                                       Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+                                       String strRemoteIP, MsgType msgType, long msgRcvTime) throws MessageIDException {
 
         int inLongMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -406,129 +361,124 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
             inLongMsgVer = 4;
         }
+        StringBuilder strBuff = new StringBuilder(512);
         int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
-
+        // process each ProxyMessage
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
-
+                // build InLongMsg
+                String groupId = null;
+                int streamMsgCnt = 0;
                 InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
-                Map<String, String> headers = new HashMap<String, String>();
-                for (ProxyMessage message : streamIdEntry.getValue()) {
-                    if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+                    for (ProxyMessage message : streamIdEntry.getValue()) {
+                        if (StringUtils.isEmpty(groupId)) {
+                            groupId = message.getGroupId();
+                        }
+                        streamMsgCnt++;
                         message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
-                    } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+                    }
+                } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+                    for (ProxyMessage message : streamIdEntry.getValue()) {
+                        if (StringUtils.isEmpty(groupId)) {
+                            groupId = message.getGroupId();
+                        }
+                        streamMsgCnt++;
                         inLongMsg.addMsg(message.getData());
-                    } else {
+                    }
+                } else {
+                    for (ProxyMessage message : streamIdEntry.getValue()) {
+                        if (StringUtils.isEmpty(groupId)) {
+                            groupId = message.getGroupId();
+                        }
+                        streamMsgCnt++;
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-                // get msgTime
-                long currTIme = System.currentTimeMillis();
-                String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
-                long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
-                LocalDateTime localDateTime =
-                        LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
-                String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
-                headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
-                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-                // get data time
-                long dtTime = NumberUtils.toLong(
-                        commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
-                headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
-
+                if (recordMsgCnt != streamMsgCnt) {
+                    logger.debug("Found message count not equal, record={}, calculate value = {}",
+                            recordMsgCnt, streamMsgCnt);
+                }
+                commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(streamMsgCnt));
+                // build headers
+                Map<String, String> headers = new HashMap<>();
+                headers.put(AttributeConstants.GROUP_ID, groupId);
+                headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey());
+                headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
+                String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
+                headers.put(AttributeConstants.DATA_TIME, strDataTime);
+                headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
+                headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
+                headers.put(ConfigConstants.MSG_COUNTER_KEY,
+                        commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
+                headers.put(AttributeConstants.RCV_TIME,
+                        commonAttrMap.get(AttributeConstants.RCV_TIME));
+                // add extra key-value information
+                headers.put(AttributeConstants.UNIQ_ID,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
                 if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
                     headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
                 }
-
                 String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
                 if (StringUtils.isNotEmpty(syncSend)) {
                     headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
                 }
-
                 String partitionKey = commonAttrMap.get(AttributeConstants.MESSAGE_PARTITION_KEY);
                 if (StringUtils.isNotEmpty(partitionKey)) {
                     headers.put(AttributeConstants.MESSAGE_PARTITION_KEY, partitionKey);
                 }
-
-                headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
-                headers.put(AttributeConstants.GROUP_ID,
-                        streamIdEntry.getValue().get(0).getGroupId());
-                headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey());
-                headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
-                headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
-                // every message share the same msg cnt? what if msgType = 5
-                String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
-                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
-                    commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
-                    proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
-                }
-                headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
-
-                byte[] data = inLongMsg.buildArray();
-                headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
-
-                headers.put(AttributeConstants.UNIQ_ID,
-                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
                 String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
                 if (StringUtils.isNotEmpty(sequenceId)) {
-                    StringBuilder sidBuilder = new StringBuilder();
-                    sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
+                    strBuff.append(topicEntry.getKey()).append(SEPARATOR)
+                            .append(streamIdEntry.getKey())
                             .append(SEPARATOR).append(sequenceId);
-                    headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
+                    headers.put(ConfigConstants.SEQUENCE_ID, strBuff.toString());
+                    strBuff.delete(0, strBuff.length());
                 }
+                final byte[] data = inLongMsg.buildArray();
                 Event event = EventBuilder.withBody(data, headers);
+                inLongMsg.reset();
+                // build metric data item
                 String orderType = "non-order";
                 if (MessageUtils.isSyncSendForOrder(event)) {
                     event = new OrderEvent(ctx, event);
                     orderType = "order";
                 }
-                long dtten = 0;
-                try {
-                    dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
-                } catch (Exception e1) {
-                    long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                    throw new MessageIDException(uniqVal,
-                            ErrorCode.DT_ERROR,
-                            new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
-                                    + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
-                                    + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
-                }
-
-                dtten = dtten / 1000 / 60 / 10;
-                dtten = dtten * 1000 * 60 * 10;
-                StringBuilder newbase = new StringBuilder();
-                newbase.append(protocolType).append(SEPARATOR)
+                long longDataTime = Long.parseLong(strDataTime);
+                longDataTime = longDataTime / 1000 / 60 / 10;
+                longDataTime = longDataTime * 1000 * 60 * 10;
+                strBuff.append(protocolType).append(SEPARATOR)
                         .append(topicEntry.getKey()).append(SEPARATOR)
                         .append(streamIdEntry.getKey()).append(SEPARATOR)
                         .append(strRemoteIP).append(SEPARATOR)
                         .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
                         .append(orderType).append(SEPARATOR)
-                        .append(new SimpleDateFormat("yyyyMMddHHmm")
-                                .format(dtten)).append(SEPARATOR).append(pkgTimeStr);
+                        .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEPARATOR)
+                        .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
                 try {
                     processor.processEvent(event);
                     monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                     this.addMetric(true, data.length, event);
-                    monitorIndex.addAndGet(new String(newbase),
-                            Integer.parseInt(proxyMetricMsgCnt), 1, data.length, 0);
+                    monitorIndex.addAndGet(strBuff.toString(),
+                            streamMsgCnt, 1, data.length, 0);
+                    strBuff.delete(0, strBuff.length());
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
-                            Integer.parseInt(proxyMetricMsgCnt));
+                    monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
                     this.addMetric(false, data.length, event);
+                    strBuff.delete(0, strBuff.length());
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
                 }
             }
         }
     }
 
-    private void responsePackage(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
-            Map<String, Object> resultMap,
-            Channel remoteChannel,
-            SocketAddress remoteSocketAddress,
-            MsgType msgType) throws Exception {
+    private void responsePackage(Map<String, String> commonAttrMap,
+                                 Map<String, Object> resultMap,
+                                 Channel remoteChannel,
+                                 MsgType msgType) throws Exception {
         String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
         if (isAck == null || "true".equals(isAck)) {
             if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
@@ -571,25 +521,22 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     }
                 }
             } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-                String backattrs = null;
-                if (resultMap.containsKey(ConfigConstants.DECODER_ATTRS)) {
-                    backattrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
-                }
+                String backAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
                 String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
-                ByteBuf binBuffer = MessageUtils.getResponsePackage(backattrs, msgType, uniqVal);
+                ByteBuf binBuffer = MessageUtils.getResponsePackage(backAttrs, msgType, uniqVal);
                 if (remoteChannel.isWritable()) {
                     remoteChannel.writeAndFlush(binBuffer);
                     logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
-                            remoteChannel, backattrs, uniqVal);
+                            remoteChannel, backAttrs, uniqVal);
                 } else {
                     binBuffer.release();
                     logger.warn(
                             "the send buffer2 is full, so disconnect it!please check remote client"
                                     + "; Connection info:" + remoteChannel + ";attr is "
-                                    + backattrs);
+                                    + backAttrs);
                     throw new Exception(new Throwable(
                             "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
-                                    + remoteChannel + ";attr is " + backattrs));
+                                    + remoteChannel + ";attr is " + backAttrs));
                 }
             }
         }
@@ -597,9 +544,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        logger.debug("message received");
         if (msg == null) {
-            logger.error("get null msg, just skip");
+            logger.error("Get null msg, just skip!");
             this.addMetric(false, 0, null);
             return;
         }
@@ -609,26 +555,27 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             String strRemoteIP = getRemoteIp(remoteChannel);
             int len = cb.readableBytes();
             if (len == 0 && this.filterEmptyMsg) {
-                logger.warn("skip empty msg.");
+                logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
                 this.addMetric(false, 0, null);
                 return;
             }
-
+            // parse message
             Map<String, Object> resultMap = null;
+            final long msgRcvTime = System.currentTimeMillis();
             try {
-                resultMap = serviceDecoder.extractData(cb, remoteChannel);
+                resultMap = serviceDecoder.extractData(cb,
+                        strRemoteIP, msgRcvTime, remoteChannel);
+                if (resultMap == null || resultMap.isEmpty()) {
+                    logger.info("Parse message result is null, from {}", strRemoteIP);
+                    this.addMetric(false, 0, null);
+                    return;
+                }
             } catch (MessageIDException ex) {
                 logger.error("MessageIDException ex = {}", ex);
                 this.addMetric(false, 0, null);
                 throw new IOException(ex.getCause());
             }
-
-            if (resultMap == null) {
-                logger.info("result is null");
-                this.addMetric(false, 0, null);
-                return;
-            }
-
+            // process message by msgType
             MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
             if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
                 ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
@@ -637,90 +584,83 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 this.addMetric(false, 0, null);
                 return;
             }
-
+            // process heart beat 8
             if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
                 this.addMetric(false, 0, null);
                 return;
             }
-
+            // process data message
             Map<String, String> commonAttrMap =
                     (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
             if (commonAttrMap == null) {
                 commonAttrMap = new HashMap<String, String>();
             }
-
             List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
-
             boolean checkMessageTopic = true;
-            if (msgList != null
-                    && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
-                    && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-                Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
-                        new HashMap<String, HashMap<String, List<ProxyMessage>>>(
-                                msgList.size());
-
-                checkMessageTopic = updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP,
-                        msgType);
-                if (checkMessageTopic) {
-                    formatMessagesAndSend(ctx, commonAttrMap, messageMap,
-                            strRemoteIP, msgType);
-                }
-            } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-                Map<String, String> headers = new HashMap<String, String>();
-                headers.put("msgtype", "filestatus");
-                headers.put(ConfigConstants.FILE_CHECK_DATA,
-                        "true");
-                headers.put(AttributeConstants.UNIQ_ID,
-                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                for (ProxyMessage message : msgList) {
-                    byte[] body = message.getData();
-                    Event event = EventBuilder.withBody(body, headers);
-                    if (MessageUtils.isSyncSendForOrder(commonAttrMap
-                            .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                        event = new OrderEvent(ctx, event);
-                    }
-                    try {
-                        processor.processEvent(event);
-                        this.addMetric(true, body.length, event);
-                    } catch (Throwable ex) {
-                        logger.error("Error writing to controller,data will discard.", ex);
-                        this.addMetric(false, body.length, event);
-                        throw new ChannelException(
-                                "Process Controller Event error can't write event to channel.");
+            if (msgList != null) {
+                if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+                    // process file check data
+                    Map<String, String> headers = new HashMap<String, String>();
+                    headers.put("msgtype", "filestatus");
+                    headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
+                    headers.put(AttributeConstants.UNIQ_ID,
+                            commonAttrMap.get(AttributeConstants.UNIQ_ID));
+                    for (ProxyMessage message : msgList) {
+                        byte[] body = message.getData();
+                        Event event = EventBuilder.withBody(body, headers);
+                        if (MessageUtils.isSyncSendForOrder(commonAttrMap
+                                .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                            event = new OrderEvent(ctx, event);
+                        }
+                        try {
+                            processor.processEvent(event);
+                            this.addMetric(true, body.length, event);
+                        } catch (Throwable ex) {
+                            logger.error("Error writing to controller,data will discard.", ex);
+                            this.addMetric(false, body.length, event);
+                            throw new ChannelException(
+                                    "Process Controller Event error can't write event to channel.");
+                        }
                     }
-                }
-            } else if (msgList != null && commonAttrMap
-                    .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-                logger.info("i am in MINUTE_CHECK_DATA");
-                Map<String, String> headers = new HashMap<String, String>();
-                headers.put("msgtype", "measure");
-                headers.put(ConfigConstants.FILE_CHECK_DATA,
-                        "true");
-                headers.put(AttributeConstants.UNIQ_ID,
-                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                for (ProxyMessage message : msgList) {
-                    byte[] body = message.getData();
-                    Event event = EventBuilder.withBody(body, headers);
-                    if (MessageUtils.isSyncSendForOrder(commonAttrMap
-                            .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                        event = new OrderEvent(ctx, event);
+                } else if (commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                    // process minute check data
+                    Map<String, String> headers = new HashMap<String, String>();
+                    headers.put("msgtype", "measure");
+                    headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
+                    headers.put(AttributeConstants.UNIQ_ID,
+                            commonAttrMap.get(AttributeConstants.UNIQ_ID));
+                    for (ProxyMessage message : msgList) {
+                        byte[] body = message.getData();
+                        Event event = EventBuilder.withBody(body, headers);
+                        if (MessageUtils.isSyncSendForOrder(commonAttrMap
+                                .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                            event = new OrderEvent(ctx, event);
+                        }
+                        try {
+                            processor.processEvent(event);
+                            this.addMetric(true, body.length, event);
+                        } catch (Throwable ex) {
+                            logger.error("Error writing to controller,data will discard.", ex);
+                            this.addMetric(false, body.length, event);
+                            throw new ChannelException(
+                                    "Process Controller Event error can't write event to channel.");
+                        }
                     }
-                    try {
-                        processor.processEvent(event);
-                        this.addMetric(true, body.length, event);
-                    } catch (Throwable ex) {
-                        logger.error("Error writing to controller,data will discard.", ex);
-                        this.addMetric(false, body.length, event);
-                        throw new ChannelException(
-                                "Process Controller Event error can't write event to channel.");
+                } else {
+                    // process message data
+                    Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+                            new HashMap<>(msgList.size());
+                    checkMessageTopic = updateMsgList(msgList,
+                            commonAttrMap, messageMap, strRemoteIP);
+                    if (checkMessageTopic) {
+                        formatMessagesAndSend(ctx, commonAttrMap,
+                                messageMap, strRemoteIP, msgType, msgRcvTime);
                     }
                 }
             }
-            SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
             if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap
                     .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
-                        remoteSocketAddress, msgType);
+                responsePackage(commonAttrMap, resultMap, remoteChannel, msgType);
             }
         } finally {
             cb.release();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 7f4852b5c..3f13f7e1d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -28,10 +28,13 @@ public interface ServiceDecoder {
     /**
      * extract data from buffer and convert it into map.
      * 
-     * @param cb
-     * @param channel
-     * @return Map
+     * @param cb           the message Byte buffer
+     * @param strRemoteIP  the remote ip message sent
+     * @param msgRcvTime   the received message time
+     * @param channel      the channel
+     * @return Map         the message map
      * @throws Exception
      */
-    Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
+    Map<String, Object> extractData(ByteBuf cb, String strRemoteIP,
+                                    long msgRcvTime, Channel channel) throws Exception;
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 38ca9ee32..71372da18 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -570,6 +570,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
             return;
         }
         Channel remoteChannel = ctx.channel();
+        String strRemoteIP = getRemoteIp(remoteChannel);
         ByteBuf cb = (ByteBuf) msg;
         try {
             int len = cb.readableBytes();
@@ -580,8 +581,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                 return;
             }
             Map<String, Object> resultMap = null;
+            final long msgRcvTime = System.currentTimeMillis();
             try {
-                resultMap = serviceProcessor.extractData(cb, remoteChannel);
+                resultMap = serviceProcessor.extractData(cb,
+                        strRemoteIP, msgRcvTime, remoteChannel);
             } catch (MessageIDException ex) {
                 this.addMetric(false, 0, null);
                 throw new IOException(ex.getCause());
@@ -617,7 +620,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                     && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
                     && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                 Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
-                String strRemoteIP = getRemoteIp(remoteChannel);
                 updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
 
                 formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);