You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 10:57:18 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5880][DataProxy] Add data reporting time process logic (#5882)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new ac9c9de90 [INLONG-5880][DataProxy] Add data reporting time process logic (#5882)
ac9c9de90 is described below

commit ac9c9de90fca19b4091a6def4803527cf8ad1936
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Sep 14 11:26:21 2022 +0800

    [INLONG-5880][DataProxy] Add data reporting time process logic (#5882)
---
 .../dataproxy/http/SimpleMessageHandler.java       |  9 +++-
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  8 +++-
 .../dataproxy/source/DefaultServiceDecoder.java    |  6 +--
 .../dataproxy/source/ServerMessageHandler.java     | 43 +++++++++---------
 .../dataproxy/source/SimpleMessageHandler.java     | 51 +++++++++-------------
 5 files changed, 60 insertions(+), 57 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 0eeb614da..1969f7cea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
@@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.ServiceDecoder;
+import org.apache.inlong.dataproxy.utils.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,9 +138,14 @@ public class SimpleMessageHandler implements MessageHandler {
         headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
         byte[] data = inLongMsg.buildArray();
         headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
+        // add msgTime if not existed
+        long currTIme = System.currentTimeMillis();
+        String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME);
+        long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
         LocalDateTime localDateTime =
-                LocalDateTime.ofInstant(Instant.ofEpochMilli(inLongMsg.getCreatetime()), defZoneId);
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
         String pkgTime = DATE_FORMATTER.format(localDateTime);
+        headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
         headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
         Event event = EventBuilder.withBody(data, headers);
         inLongMsg.reset();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index f2bbbe52e..f7e9ae6b6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -25,6 +25,7 @@ import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
 
@@ -88,7 +89,12 @@ public class AuditUtils {
             String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            long msgCount = 1L;
+            if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
+                msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+            }
+            AuditImp.getInstance().add(auditID, inlongGroupId,
+                    inlongStreamId, logTime, msgCount, event.getBody().length);
         }
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index cb4497cbe..355dc7a2e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
+import org.apache.inlong.dataproxy.utils.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
@@ -112,9 +113,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             long uniq, long dataTime, int msgCount) {
         commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
         String time = "";
-        if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
-            time = commonAttrMap
-                    .get(ConfigConstants.PKG_TIME_KEY);
+        if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) {
+            time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
         } else {
             time = String.valueOf(dataTime);
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index d79143260..9d3b32c4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
@@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
@@ -54,6 +56,10 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -85,8 +91,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final ThreadLocal<SimpleDateFormat> dateFormator =
-            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmm"));
+    private static final DateTimeFormatter DATE_FORMATTER
+            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final ZoneId defZoneId = ZoneId.systemDefault();
 
     private AbstractSource source;
 
@@ -416,23 +423,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-
-                long pkgTimeInMillis = inLongMsg.getCreatetime();
-                String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
-                if (inLongMsgVer == 4) {
-                    if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
-                        pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
-                    } else {
-                        pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
-                    }
-                }
-
-                if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
-                    headers.put(AttributeConstants.DATA_TIME, commonAttrMap.get(AttributeConstants.DATA_TIME));
-                } else {
-                    headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
-                }
+                // get msgTime
+                long currTIme = System.currentTimeMillis();
+                String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+                long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+                LocalDateTime localDateTime =
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+                String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+                headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+                // get data time
+                long dtTime = NumberUtils.toLong(
+                        commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+                headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
 
                 if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
                     headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
@@ -474,8 +477,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                             .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
-
-                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
                 Event event = EventBuilder.withBody(data, headers);
                 String orderType = "non-order";
                 if (MessageUtils.isSyncSendForOrder(event)) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 3cc105aa7..38ca9ee32 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -32,7 +32,10 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -79,20 +82,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() {
+    private static final DateTimeFormatter DATE_FORMATTER
+            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final ZoneId defZoneId = ZoneId.systemDefault();
 
-        @Override
-        protected SimpleDateFormat initialValue() {
-            return new SimpleDateFormat("yyyyMMddHHmm");
-        }
-    };
-    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() {
-
-        @Override
-        protected SimpleDateFormat initialValue() {
-            return new SimpleDateFormat("yyyyMMddHHmmss");
-        }
-    };
     private AbstractSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
@@ -392,20 +385,18 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-
-                long pkgTimeInMillis = inLongMsg.getCreatetime();
-                String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
-                if (inLongMsgVer == 4) {
-                    if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
-                        pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
-                    } else {
-                        pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
-                    }
-                }
-
-                long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
-                        System.currentTimeMillis());
+                // get msgTime
+                long currTIme = System.currentTimeMillis();
+                String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+                long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+                LocalDateTime localDateTime =
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+                String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+                headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+                // get data time
+                long dtTime = NumberUtils.toLong(
+                        commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
                 headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
 
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
@@ -428,8 +419,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
 
-                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-
                 // process proxy message list
                 this.processProxyMessageList(headers, streamIdEntry.getValue());
             }
@@ -474,7 +463,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
         headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
         headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
         headers.put(Constants.TOPIC, proxyMessage.getTopic());
-        headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
+        //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
         headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP));
         Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
         return event;