You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/19 11:26:43 UTC
[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cdd279ea8 [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)
cdd279ea8 is described below
commit cdd279ea867e523b346809ac006dab5aafcee790
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Sep 19 19:26:38 2022 +0800
[INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929)
---
.../inlong/dataproxy/sink/common/TubeUtils.java | 6 +-
.../dataproxy/source/ServerMessageFactory.java | 7 +--
.../dataproxy/source/SimpleMessageHandler.java | 67 ++++++++++++----------
3 files changed, 44 insertions(+), 36 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index 16c10e384..c1ed4b44d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -61,9 +61,9 @@ public class TubeUtils {
Message message = new Message(topicName, event.getBody());
String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
- String streamId = headers.get(Constants.INLONG_STREAM_ID);
- message.putSystemHeader(streamId,
- headers.get(ConfigConstants.PKG_TIME_KEY));
+ long dataTimeL = Long.parseLong(headers.get(ConfigConstants.PKG_TIME_KEY));
+ message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
+ DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
} else {
long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 69e891396..93a2a8c95 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -26,7 +26,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.source.AbstractSource;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
@@ -46,7 +45,7 @@ public class ServerMessageFactory
private static int MSG_LENGTH_LEN = 4;
- private AbstractSource source;
+ private BaseSource source;
private ChannelProcessor processor;
@@ -93,7 +92,7 @@ public class ServerMessageFactory
* @param monitorIndexExt
* @param name
*/
- public ServerMessageFactory(AbstractSource source, ChannelGroup allChannels, String protocol,
+ public ServerMessageFactory(BaseSource source, ChannelGroup allChannels, String protocol,
ServiceDecoder serviceDecoder, String messageHandlerName, Integer maxMsgLength,
String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
Boolean isCompressed, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt,
@@ -135,7 +134,7 @@ public class ServerMessageFactory
.forName(messageHandlerName);
Constructor<?> ctor = clazz.getConstructor(
- AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
+ BaseSource.class, ServiceDecoder.class, ChannelGroup.class,
String.class, String.class, Boolean.class,
Integer.class, Boolean.class, MonitorIndex.class,
MonitorIndexExt.class, String.class);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index b101b9651..ea2691980 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -22,26 +22,24 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.group.ChannelGroup;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
@@ -62,9 +60,6 @@ import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
/**
* Server message handler
*
@@ -82,10 +77,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private static final DateTimeFormatter DATE_FORMATTER
- = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
- private static final ZoneId defZoneId = ZoneId.systemDefault();
+ private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() {
+
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmm");
+ }
+ };
+ private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmmss");
+ }
+ };
private BaseSource source;
private final ChannelGroup allChannels;
private int maxConnections = Integer.MAX_VALUE;
@@ -381,18 +386,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- // get msgTime
- long currTIme = System.currentTimeMillis();
- String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
- long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
- LocalDateTime localDateTime =
- LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
- String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
- headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
- // get data time
- long dtTime = NumberUtils.toLong(
- commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+
+ long pkgTimeInMillis = inLongMsg.getCreatetime();
+ String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
+
+ if (inLongMsgVer == 4) {
+ if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
+ pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
+ } else {
+ pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
+ }
+ }
+
+ long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
+ System.currentTimeMillis());
headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
@@ -415,6 +422,8 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
}
+ headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+
// process proxy message list
this.processProxyMessageList(headers, streamIdEntry.getValue());
}