You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/17 07:10:43 UTC

[inlong] branch master updated: [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c20a36c [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
64c20a36c is described below

commit 64c20a36c179bfecc300618cc63c60401b04b741
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Sep 17 15:10:37 2022 +0800

    [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
---
 .../inlong/dataproxy/consts/ConfigConstants.java   |   2 +
 .../dataproxy/http/SimpleMessageHandler.java       |   2 +
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  23 ++-
 .../dataproxy/sink/SimpleMessageTubeSink.java      |   3 +-
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 162 ++++++++-------------
 .../inlong/dataproxy/sink/common/TubeUtils.java    |  39 +++--
 .../dataproxy/source/ServerMessageHandler.java     |   2 +
 .../dataproxy/source/SimpleMessageHandler.java     |   4 +-
 .../inlong/dataproxy/utils/InLongMsgVer.java       |  54 +++++++
 .../inlong/dataproxy/utils/MessageUtils.java       |  34 +++++
 10 files changed, 192 insertions(+), 133 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index dd79a9bb0..1a6fcada1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -75,6 +75,8 @@ public class ConfigConstants {
 
     public static final String TOPIC_KEY = "topic";
     public static final String REMOTE_IP_KEY = "srcIp";
+    public static final String DATAPROXY_IP_KEY = "dpIp";
+    public static final String MSG_ENCODE_VER = "msgEnType";
     public static final String REMOTE_IDC_KEY = "idc";
     public static final String MSG_COUNTER_KEY = "msgcnt";
     public static final String PKG_COUNTER_KEY = "pkgcnt";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 0689039e1..16bb5c917 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -41,6 +41,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.ServiceDecoder;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,6 +153,7 @@ public class SimpleMessageHandler implements MessageHandler {
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
         headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
         headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
+        headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
         byte[] data = inLongMsg.buildArray();
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         Event event = EventBuilder.withBody(data, headers);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index f7e9ae6b6..24c251626 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.dataproxy.metrics.audit;
 
+import java.util.HashSet;
+import java.util.Map;
 import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -28,9 +30,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
-
-import java.util.HashSet;
-import java.util.Map;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 
 /**
  * 
@@ -79,13 +79,17 @@ public class AuditUtils {
 
     /**
      * add
-     * 
+     *
      * @param auditID
      * @param event
      */
     public static void add(int auditID, Event event) {
-        if (IS_AUDIT && event != null) {
-            Map<String, String> headers = event.getHeaders();
+        if (!IS_AUDIT || event == null) {
+            return;
+        }
+        Map<String, String> headers = event.getHeaders();
+        String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
+        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
             String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
@@ -95,6 +99,13 @@ public class AuditUtils {
             }
             AuditImp.getInstance().add(auditID, inlongGroupId,
                     inlongStreamId, logTime, msgCount, event.getBody().length);
+        } else {
+            String groupId = headers.get(AttributeConstants.GROUP_ID);
+            String streamId = headers.get(AttributeConstants.STREAM_ID);
+            long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
+            long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
+            AuditImp.getInstance().add(auditID, groupId,
+                    streamId, dataTime, msgCount, event.getBody().length);
         }
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
index a0e996029..28dea521b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -354,8 +354,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
                 logger.info("{} agent package {} existed,just discard.",
                         getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
             } else {
-                producer.sendMessage(TubeUtils.buildMessage(
-                        topic, event, true), new MyCallback(es));
+                producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
                 flag.set(true);
             }
             illegalTopicMap.remove(topic);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 0f96aa0b9..42e90ed0f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -17,10 +17,22 @@
 
 package org.apache.inlong.dataproxy.sink;
 
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
+
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.collections.SetUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -47,7 +59,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
 import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
 import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
@@ -57,26 +69,11 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
-
 public class TubeSink extends AbstractSink implements Configurable {
 
     private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
     private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
     private TubeProducerHolder producerHolder = null;
-    private static final String TOPIC = "topic";
     private volatile boolean canTake = false;
     private volatile boolean canSend = false;
     private volatile boolean isOverFlow = false;
@@ -276,12 +273,8 @@ public class TubeSink extends AbstractSink implements Configurable {
                     diskRateLimiter.acquire(event.getBody().length);
                 }
                 Map<String, String> dimensions;
-                if (event.getHeaders().containsKey(TOPIC)) {
-                    dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            event.getHeaders().get(TOPIC));
-                } else {
-                    dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
-                }
+                dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                        event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, ""));
                 if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
                     tx.commit();
                     cachedMsgCnt.incrementAndGet();
@@ -335,20 +328,8 @@ public class TubeSink extends AbstractSink implements Configurable {
                         isOverFlow = false;
                         Thread.sleep(30);
                     }
-                    event = null;
-                    topic = null;
                     // get event from queues
-                    if (!resendQueue.isEmpty()) {
-                        es = resendQueue.poll();
-                        if (es == null) {
-                            continue;
-                        }
-                        resendMsgCnt.decrementAndGet();
-                        event = es.getEvent();
-                        if (event.getHeaders().containsKey(TOPIC)) {
-                            topic = event.getHeaders().get(TOPIC);
-                        }
-                    } else {
+                    if (resendQueue.isEmpty()) {
                         event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
                         if (event == null) {
                             if (!canTake && takenMsgCnt.get() <= 0) {
@@ -360,10 +341,15 @@ public class TubeSink extends AbstractSink implements Configurable {
                         cachedMsgCnt.decrementAndGet();
                         takenMsgCnt.incrementAndGet();
                         es = new EventStat(event);
-                        if (event.getHeaders().containsKey(TOPIC)) {
-                            topic = event.getHeaders().get(TOPIC);
+                    } else {
+                        es = resendQueue.poll();
+                        if (es == null) {
+                            continue;
                         }
+                        resendMsgCnt.decrementAndGet();
+                        event = es.getEvent();
                     }
+                    topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
                     // valid event status
                     if (StringUtils.isBlank(topic)) {
                         blankTopicDiscardMsgCnt.incrementAndGet();
@@ -427,8 +413,7 @@ public class TubeSink extends AbstractSink implements Configurable {
                         event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
                 return false;
             } else {
-                producer.sendMessage(TubeUtils.buildMessage(
-                        topic, event, false), new MyCallback(es));
+                producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
                 inflightMsgCnt.incrementAndGet();
                 return true;
             }
@@ -493,7 +478,8 @@ public class TubeSink extends AbstractSink implements Configurable {
             Map<String, String> dimensions = new HashMap<>();
             dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName());
             dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName());
-            dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+            dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                    event.getHeaders().get(ConfigConstants.TOPIC_KEY));
             DataProxyMetricItem.fillInlongId(event, dimensions);
             DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
             DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions);
@@ -503,14 +489,13 @@ public class TubeSink extends AbstractSink implements Configurable {
                 AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
                 if (sendTime > 0) {
                     long currentTime = System.currentTimeMillis();
-                    long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                            sendTime);
-                    long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
-                    long wholeDuration = currentTime - msgTime;
-                    metricItem.sinkDuration.addAndGet(sinkDuration);
-                    metricItem.nodeDuration.addAndGet(nodeDuration);
-                    metricItem.wholeDuration.addAndGet(wholeDuration);
+                    long msgDataTimeL = Long.parseLong(
+                            event.getHeaders().get(AttributeConstants.DATA_TIME));
+                    long msgRcvTimeL = Long.parseLong(
+                            event.getHeaders().get(AttributeConstants.RCV_TIME));
+                    metricItem.sinkDuration.addAndGet(currentTime - sendTime);
+                    metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
+                    metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
                 }
             } else {
                 metricItem.sendFailCount.incrementAndGet();
@@ -519,60 +504,31 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
 
         private void editStatistic(final Event event, boolean isSuccess) {
-            String topic = "";
-            String streamId = "";
-            String nodeIp;
-            if (event != null) {
-                if (event.getHeaders().containsKey(TOPIC)) {
-                    topic = event.getHeaders().get(TOPIC);
-                }
-                if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-                    streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-                } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                    streamId = event.getHeaders().get(AttributeConstants.INAME);
-                }
-                // Compatible agent
-                if (event.getHeaders().containsKey("ip")) {
-                    event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip"));
-                    event.getHeaders().remove("ip");
-                }
-                // Compatible agent
-                if (event.getHeaders().containsKey("time")) {
-                    event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time"));
-                    event.getHeaders().remove("time");
-                }
-                if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) {
-                    nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-                    if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
-                        if (nodeIp != null) {
-                            nodeIp = nodeIp.split(":")[0];
-                        }
-                        long msgCounterL = 1L;
-                        // msg counter
-                        if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
-                            msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-                        }
-                        StringBuilder newBase = new StringBuilder();
-                        newBase.append(getName()).append(SEP_HASHTAG).append(topic)
-                                .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
-                                .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
-                                .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
-                                .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-                        long messageSize = event.getBody().length;
-                        if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) {
-                            messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
-                        }
-                        if (statIntervalSec > 0) {
-                            if (isSuccess) {
-                                monitorIndex.addAndGet(new String(newBase),
-                                        (int) msgCounterL, 1, messageSize, 0);
-                            } else {
-                                monitorIndex.addAndGet(new String(newBase),
-                                        0, 0, 0, (int) msgCounterL);
-                            }
-                        }
-                    }
-                }
+            if (event == null || statIntervalSec <= 0) {
+                return;
+            }
+            // get statistic items
+            String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+            String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+            String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+            int intMsgCnt = Integer.parseInt(
+                    event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+            long dataTimeL = Long.parseLong(
+                    event.getHeaders().get(AttributeConstants.DATA_TIME));
+            // build statistic key
+            StringBuilder newBase = new StringBuilder(512);
+            newBase.append(getName()).append(SEP_HASHTAG).append(topic)
+                    .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+                    .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+                    .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
+                    .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+            // count data
+            if (isSuccess) {
+                monitorIndex.addAndGet(newBase.toString(),
+                        intMsgCnt, 1, event.getBody().length, 0);
+            } else {
+                monitorIndex.addAndGet(newBase.toString(),
+                        0, 0, 0, intMsgCnt);
             }
         }
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index c1e11cf9d..16c10e384 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -17,17 +17,18 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
+import java.util.Map;
 import org.apache.flume.Event;
-import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.corebase.Message;
 
-import java.util.Map;
-
 public class TubeUtils {
 
     /**
@@ -53,28 +54,24 @@ public class TubeUtils {
      *
      * @param topicName      the topic name of message
      * @param event          the DataProxy event
-     * @param addExtraAttrs  whether to add extra attributes
      * @return   the message object
      */
-    public static Message buildMessage(String topicName,
-                                       Event event, boolean addExtraAttrs) {
+    public static Message buildMessage(String topicName, Event event) {
+        Map<String, String> headers = event.getHeaders();
         Message message = new Message(topicName, event.getBody());
-        message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-        String streamId = "";
-        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-            streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-            streamId = event.getHeaders().get(AttributeConstants.INAME);
+        String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
+        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+            String streamId = headers.get(Constants.INLONG_STREAM_ID);
+            message.putSystemHeader(streamId,
+                    headers.get(ConfigConstants.PKG_TIME_KEY));
+        } else {
+            long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
+            message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
+                    DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
         }
-        message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-        if (addExtraAttrs) {
-            // common attributes
-            Map<String, String> headers = event.getHeaders();
-            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
-            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
-            message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
-            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
-            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+        Map<String, String> extraAttrMap = MessageUtils.getXfsAttrs(headers, pkgVersion);
+        for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
+            message.setAttrKeyVal(entry.getKey(), entry.getValue());
         }
         return message;
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 61a146b25..6e92cb6ba 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -57,6 +57,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -412,6 +413,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                 headers.put(ConfigConstants.MSG_COUNTER_KEY,
                         commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
+                headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
                 headers.put(AttributeConstants.RCV_TIME,
                         commonAttrMap.get(AttributeConstants.RCV_TIME));
                 // add extra key-value information
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 71372da18..1e2d54405 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -59,6 +59,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -463,8 +464,9 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
         headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
         headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
         headers.put(Constants.TOPIC, proxyMessage.getTopic());
-        //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
+        headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
         headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP));
+        headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V1.getName());
         Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
         return event;
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
new file mode 100644
index 000000000..626398df6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+public enum InLongMsgVer {
+    INLONG_V0(0, "V0", "The inlong-msg V0 format"),
+    INLONG_V1(1, "V1", "The inlong-msg V1 format");
+
+    InLongMsgVer(int id, String name, String desc) {
+        this.id = id;
+        this.name = name;
+        this.desc = desc;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public static InLongMsgVer valueOf(int value) {
+        for (InLongMsgVer inLongMsgVer : InLongMsgVer.values()) {
+            if (inLongMsgVer.getId() == value) {
+                return inLongMsgVer;
+            }
+        }
+        return INLONG_V0;
+    }
+
+    private final int id;
+    private final String name;
+    private final String desc;
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index 4e9fd2395..d8835658d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,10 +18,13 @@ package org.apache.inlong.dataproxy.utils;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
+import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.source.MsgType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,4 +110,35 @@ public class MessageUtils {
         return topic;
     }
 
+    public static Map<String, String> getXfsAttrs(Map<String, String> headers, String pkgVersion) {
+        // common attributes
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
+        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+            attrs.put("dataproxyip", NetworkUtils.getLocalIp());
+            attrs.put(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
+            attrs.put(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
+            attrs.put(Constants.TOPIC, headers.get(Constants.TOPIC));
+            attrs.put(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
+            attrs.put(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+        } else {
+            //
+            attrs.put(Constants.INLONG_GROUP_ID,
+                    headers.get(AttributeConstants.GROUP_ID));
+            attrs.put(Constants.INLONG_STREAM_ID,
+                    headers.get(AttributeConstants.STREAM_ID));
+            attrs.put(Constants.TOPIC,
+                    headers.get(ConfigConstants.TOPIC_KEY));
+            attrs.put(Constants.HEADER_KEY_MSG_TIME,
+                    headers.get(AttributeConstants.DATA_TIME));
+            attrs.put(Constants.HEADER_KEY_SOURCE_IP,
+                    headers.get(ConfigConstants.REMOTE_IP_KEY));
+            attrs.put(Constants.HEADER_KEY_SOURCE_TIME,
+                    headers.get(AttributeConstants.RCV_TIME));
+            attrs.put(ConfigConstants.DATAPROXY_IP_KEY,
+                    NetworkUtils.getLocalIp());
+        }
+        return attrs;
+    }
+
 }