You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/19 11:26:43 UTC

[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd279ea8 [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)
cdd279ea8 is described below

commit cdd279ea867e523b346809ac006dab5aafcee790
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Sep 19 19:26:38 2022 +0800

    [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)
---
 .../inlong/dataproxy/sink/common/TubeUtils.java    |  6 +-
 .../dataproxy/source/ServerMessageFactory.java     |  7 +--
 .../dataproxy/source/SimpleMessageHandler.java     | 67 ++++++++++++----------
 3 files changed, 44 insertions(+), 36 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index 16c10e384..c1ed4b44d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -61,9 +61,9 @@ public class TubeUtils {
         Message message = new Message(topicName, event.getBody());
         String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
         if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
-            String streamId = headers.get(Constants.INLONG_STREAM_ID);
-            message.putSystemHeader(streamId,
-                    headers.get(ConfigConstants.PKG_TIME_KEY));
+            long dataTimeL = Long.parseLong(headers.get(ConfigConstants.PKG_TIME_KEY));
+            message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
+                    DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
         } else {
             long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
             message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 69e891396..93a2a8c95 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -26,7 +26,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
 import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
 import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
@@ -46,7 +45,7 @@ public class ServerMessageFactory
 
     private static int MSG_LENGTH_LEN = 4;
 
-    private AbstractSource source;
+    private BaseSource source;
 
     private ChannelProcessor processor;
 
@@ -93,7 +92,7 @@ public class ServerMessageFactory
      * @param monitorIndexExt
      * @param name
      */
-    public ServerMessageFactory(AbstractSource source, ChannelGroup allChannels, String protocol,
+    public ServerMessageFactory(BaseSource source, ChannelGroup allChannels, String protocol,
             ServiceDecoder serviceDecoder, String messageHandlerName, Integer maxMsgLength,
             String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
             Boolean isCompressed, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt,
@@ -135,7 +134,7 @@ public class ServerMessageFactory
                         .forName(messageHandlerName);
 
                 Constructor<?> ctor = clazz.getConstructor(
-                        AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
+                        BaseSource.class, ServiceDecoder.class, ChannelGroup.class,
                         String.class, String.class, Boolean.class,
                         Integer.class, Boolean.class, MonitorIndex.class,
                         MonitorIndexExt.class, String.class);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index b101b9651..ea2691980 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -22,26 +22,24 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.group.ChannelGroup;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.ChannelException;
@@ -62,9 +60,6 @@ import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
 /**
  * Server message handler
  *
@@ -82,10 +77,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final DateTimeFormatter DATE_FORMATTER
-            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
-    private static final ZoneId defZoneId = ZoneId.systemDefault();
+    private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() {
+
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmm");
+        }
+    };
+    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() {
 
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmmss");
+        }
+    };
     private BaseSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
@@ -381,18 +386,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-                // get msgTime
-                long currTIme = System.currentTimeMillis();
-                String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
-                long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
-                LocalDateTime localDateTime =
-                        LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
-                String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
-                headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
-                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-                // get data time
-                long dtTime = NumberUtils.toLong(
-                        commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+
+                long pkgTimeInMillis = inLongMsg.getCreatetime();
+                String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
+
+                if (inLongMsgVer == 4) {
+                    if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
+                        pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
+                    } else {
+                        pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
+                    }
+                }
+
+                long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
+                        System.currentTimeMillis());
                 headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
 
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
@@ -415,6 +422,8 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
 
+                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+
                 // process proxy message list
                 this.processProxyMessageList(headers, streamIdEntry.getValue());
             }