You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 10:57:18 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5880][DataProxy] Add data reporting time process logic (#5882)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new ac9c9de90 [INLONG-5880][DataProxy] Add data reporting time process logic (#5882)
ac9c9de90 is described below
commit ac9c9de90fca19b4091a6def4803527cf8ad1936
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Sep 14 11:26:21 2022 +0800
[INLONG-5880][DataProxy] Add data reporting time process logic (#5882)
---
.../dataproxy/http/SimpleMessageHandler.java | 9 +++-
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 8 +++-
.../dataproxy/source/DefaultServiceDecoder.java | 6 +--
.../dataproxy/source/ServerMessageHandler.java | 43 +++++++++---------
.../dataproxy/source/SimpleMessageHandler.java | 51 +++++++++-------------
5 files changed, 60 insertions(+), 57 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 0eeb614da..1969f7cea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
@@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
+import org.apache.inlong.dataproxy.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,9 +138,14 @@ public class SimpleMessageHandler implements MessageHandler {
headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
+ // add msgTime if not existed
+ long currTIme = System.currentTimeMillis();
+ String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME);
+ long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
LocalDateTime localDateTime =
- LocalDateTime.ofInstant(Instant.ofEpochMilli(inLongMsg.getCreatetime()), defZoneId);
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
String pkgTime = DATE_FORMATTER.format(localDateTime);
+ headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
Event event = EventBuilder.withBody(data, headers);
inLongMsg.reset();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index f2bbbe52e..f7e9ae6b6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -25,6 +25,7 @@ import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -88,7 +89,12 @@ public class AuditUtils {
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ long msgCount = 1L;
+ if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
+ msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+ }
+ AuditImp.getInstance().add(auditID, inlongGroupId,
+ inlongStreamId, logTime, msgCount, event.getBody().length);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index cb4497cbe..355dc7a2e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
+import org.apache.inlong.dataproxy.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -112,9 +113,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
long uniq, long dataTime, int msgCount) {
commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
String time = "";
- if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
- time = commonAttrMap
- .get(ConfigConstants.PKG_TIME_KEY);
+ if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) {
+ time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
} else {
time = String.valueOf(dataTime);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index d79143260..9d3b32c4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
@@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.slf4j.Logger;
@@ -54,6 +56,10 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -85,8 +91,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private static final ThreadLocal<SimpleDateFormat> dateFormator =
- ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmm"));
+ private static final DateTimeFormatter DATE_FORMATTER
+ = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+ private static final ZoneId defZoneId = ZoneId.systemDefault();
private AbstractSource source;
@@ -416,23 +423,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
-
- long pkgTimeInMillis = inLongMsg.getCreatetime();
- String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
- if (inLongMsgVer == 4) {
- if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
- pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
- } else {
- pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
- }
- }
-
- if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
- headers.put(AttributeConstants.DATA_TIME, commonAttrMap.get(AttributeConstants.DATA_TIME));
- } else {
- headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
- }
+ // get msgTime
+ long currTIme = System.currentTimeMillis();
+ String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+ long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+ String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+ headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+ headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+ // get data time
+ long dtTime = NumberUtils.toLong(
+ commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+ headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
@@ -474,8 +477,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
.append(SEPARATOR).append(sequenceId);
headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
}
-
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
Event event = EventBuilder.withBody(data, headers);
String orderType = "non-order";
if (MessageUtils.isSyncSendForOrder(event)) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 3cc105aa7..38ca9ee32 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -32,7 +32,10 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -79,20 +82,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() {
+ private static final DateTimeFormatter DATE_FORMATTER
+ = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+ private static final ZoneId defZoneId = ZoneId.systemDefault();
- @Override
- protected SimpleDateFormat initialValue() {
- return new SimpleDateFormat("yyyyMMddHHmm");
- }
- };
- private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() {
-
- @Override
- protected SimpleDateFormat initialValue() {
- return new SimpleDateFormat("yyyyMMddHHmmss");
- }
- };
private AbstractSource source;
private final ChannelGroup allChannels;
private int maxConnections = Integer.MAX_VALUE;
@@ -392,20 +385,18 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
-
- long pkgTimeInMillis = inLongMsg.getCreatetime();
- String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
- if (inLongMsgVer == 4) {
- if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
- pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
- } else {
- pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
- }
- }
-
- long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
- System.currentTimeMillis());
+ // get msgTime
+ long currTIme = System.currentTimeMillis();
+ String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+ long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+ String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+ headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+ headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+ // get data time
+ long dtTime = NumberUtils.toLong(
+ commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
@@ -428,8 +419,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
}
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-
// process proxy message list
this.processProxyMessageList(headers, streamIdEntry.getValue());
}
@@ -474,7 +463,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
headers.put(Constants.TOPIC, proxyMessage.getTopic());
- headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
+ //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP));
Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
return event;