You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/17 07:10:43 UTC
[inlong] branch master updated: [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 64c20a36c [INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
64c20a36c is described below
commit 64c20a36c179bfecc300618cc63c60401b04b741
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Sep 17 15:10:37 2022 +0800
[INLONG-5917][DataProxy] Optimize TubeSink class (#5920)
---
.../inlong/dataproxy/consts/ConfigConstants.java | 2 +
.../dataproxy/http/SimpleMessageHandler.java | 2 +
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 23 ++-
.../dataproxy/sink/SimpleMessageTubeSink.java | 3 +-
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 162 ++++++++-------------
.../inlong/dataproxy/sink/common/TubeUtils.java | 39 +++--
.../dataproxy/source/ServerMessageHandler.java | 2 +
.../dataproxy/source/SimpleMessageHandler.java | 4 +-
.../inlong/dataproxy/utils/InLongMsgVer.java | 54 +++++++
.../inlong/dataproxy/utils/MessageUtils.java | 34 +++++
10 files changed, 192 insertions(+), 133 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index dd79a9bb0..1a6fcada1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -75,6 +75,8 @@ public class ConfigConstants {
public static final String TOPIC_KEY = "topic";
public static final String REMOTE_IP_KEY = "srcIp";
+ public static final String DATAPROXY_IP_KEY = "dpIp";
+ public static final String MSG_ENCODE_VER = "msgEnType";
public static final String REMOTE_IDC_KEY = "idc";
public static final String MSG_COUNTER_KEY = "msgcnt";
public static final String PKG_COUNTER_KEY = "pkgcnt";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 0689039e1..16bb5c917 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -41,6 +41,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,6 +153,7 @@ public class SimpleMessageHandler implements MessageHandler {
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
+ headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
byte[] data = inLongMsg.buildArray();
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
Event event = EventBuilder.withBody(data, headers);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index f7e9ae6b6..24c251626 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.dataproxy.metrics.audit;
+import java.util.HashSet;
+import java.util.Map;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
@@ -28,9 +30,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
-
-import java.util.HashSet;
-import java.util.Map;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
/**
*
@@ -79,13 +79,17 @@ public class AuditUtils {
/**
* add
- *
+ *
* @param auditID
* @param event
*/
public static void add(int auditID, Event event) {
- if (IS_AUDIT && event != null) {
- Map<String, String> headers = event.getHeaders();
+ if (!IS_AUDIT || event == null) {
+ return;
+ }
+ Map<String, String> headers = event.getHeaders();
+ String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
+ if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
@@ -95,6 +99,13 @@ public class AuditUtils {
}
AuditImp.getInstance().add(auditID, inlongGroupId,
inlongStreamId, logTime, msgCount, event.getBody().length);
+ } else {
+ String groupId = headers.get(AttributeConstants.GROUP_ID);
+ String streamId = headers.get(AttributeConstants.STREAM_ID);
+ long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
+ long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
+ AuditImp.getInstance().add(auditID, groupId,
+ streamId, dataTime, msgCount, event.getBody().length);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
index a0e996029..28dea521b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -354,8 +354,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable
logger.info("{} agent package {} existed,just discard.",
getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
} else {
- producer.sendMessage(TubeUtils.buildMessage(
- topic, event, true), new MyCallback(es));
+ producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
flag.set(true);
}
illegalTopicMap.remove(topic);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 0f96aa0b9..42e90ed0f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -17,10 +17,22 @@
package org.apache.inlong.dataproxy.sink;
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
+
import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.SetUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -47,7 +59,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
import org.apache.inlong.dataproxy.sink.common.TubeUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
@@ -57,26 +69,11 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
-
public class TubeSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
private TubeProducerHolder producerHolder = null;
- private static final String TOPIC = "topic";
private volatile boolean canTake = false;
private volatile boolean canSend = false;
private volatile boolean isOverFlow = false;
@@ -276,12 +273,8 @@ public class TubeSink extends AbstractSink implements Configurable {
diskRateLimiter.acquire(event.getBody().length);
}
Map<String, String> dimensions;
- if (event.getHeaders().containsKey(TOPIC)) {
- dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().get(TOPIC));
- } else {
- dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
- }
+ dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, ""));
if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
tx.commit();
cachedMsgCnt.incrementAndGet();
@@ -335,20 +328,8 @@ public class TubeSink extends AbstractSink implements Configurable {
isOverFlow = false;
Thread.sleep(30);
}
- event = null;
- topic = null;
// get event from queues
- if (!resendQueue.isEmpty()) {
- es = resendQueue.poll();
- if (es == null) {
- continue;
- }
- resendMsgCnt.decrementAndGet();
- event = es.getEvent();
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- } else {
+ if (resendQueue.isEmpty()) {
event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
if (event == null) {
if (!canTake && takenMsgCnt.get() <= 0) {
@@ -360,10 +341,15 @@ public class TubeSink extends AbstractSink implements Configurable {
cachedMsgCnt.decrementAndGet();
takenMsgCnt.incrementAndGet();
es = new EventStat(event);
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
+ } else {
+ es = resendQueue.poll();
+ if (es == null) {
+ continue;
}
+ resendMsgCnt.decrementAndGet();
+ event = es.getEvent();
}
+ topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
// valid event status
if (StringUtils.isBlank(topic)) {
blankTopicDiscardMsgCnt.incrementAndGet();
@@ -427,8 +413,7 @@ public class TubeSink extends AbstractSink implements Configurable {
event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
return false;
} else {
- producer.sendMessage(TubeUtils.buildMessage(
- topic, event, false), new MyCallback(es));
+ producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es));
inflightMsgCnt.incrementAndGet();
return true;
}
@@ -493,7 +478,8 @@ public class TubeSink extends AbstractSink implements Configurable {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName());
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ event.getHeaders().get(ConfigConstants.TOPIC_KEY));
DataProxyMetricItem.fillInlongId(event, dimensions);
DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions);
@@ -503,14 +489,13 @@ public class TubeSink extends AbstractSink implements Configurable {
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
- long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- sendTime);
- long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration);
- metricItem.nodeDuration.addAndGet(nodeDuration);
- metricItem.wholeDuration.addAndGet(wholeDuration);
+ long msgDataTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ long msgRcvTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.RCV_TIME));
+ metricItem.sinkDuration.addAndGet(currentTime - sendTime);
+ metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
+ metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
}
} else {
metricItem.sendFailCount.incrementAndGet();
@@ -519,60 +504,31 @@ public class TubeSink extends AbstractSink implements Configurable {
}
private void editStatistic(final Event event, boolean isSuccess) {
- String topic = "";
- String streamId = "";
- String nodeIp;
- if (event != null) {
- if (event.getHeaders().containsKey(TOPIC)) {
- topic = event.getHeaders().get(TOPIC);
- }
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
- // Compatible agent
- if (event.getHeaders().containsKey("ip")) {
- event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip"));
- event.getHeaders().remove("ip");
- }
- // Compatible agent
- if (event.getHeaders().containsKey("time")) {
- event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time"));
- event.getHeaders().remove("time");
- }
- if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) {
- nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
- if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
- if (nodeIp != null) {
- nodeIp = nodeIp.split(":")[0];
- }
- long msgCounterL = 1L;
- // msg counter
- if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
- msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- }
- StringBuilder newBase = new StringBuilder();
- newBase.append(getName()).append(SEP_HASHTAG).append(topic)
- .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
- .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
- .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
- .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
- long messageSize = event.getBody().length;
- if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) {
- messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
- }
- if (statIntervalSec > 0) {
- if (isSuccess) {
- monitorIndex.addAndGet(new String(newBase),
- (int) msgCounterL, 1, messageSize, 0);
- } else {
- monitorIndex.addAndGet(new String(newBase),
- 0, 0, 0, (int) msgCounterL);
- }
- }
- }
- }
+ if (event == null || statIntervalSec <= 0) {
+ return;
+ }
+ // get statistic items
+ String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+ String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+ String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+ int intMsgCnt = Integer.parseInt(
+ event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+ long dataTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ // build statistic key
+ StringBuilder newBase = new StringBuilder(512);
+ newBase.append(getName()).append(SEP_HASHTAG).append(topic)
+ .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+ .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+ .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+ // count data
+ if (isSuccess) {
+ monitorIndex.addAndGet(newBase.toString(),
+ intMsgCnt, 1, event.getBody().length, 0);
+ } else {
+ monitorIndex.addAndGet(newBase.toString(),
+ 0, 0, 0, intMsgCnt);
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index c1e11cf9d..16c10e384 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -17,17 +17,18 @@
package org.apache.inlong.dataproxy.sink.common;
+import java.util.Map;
import org.apache.flume.Event;
-import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.corebase.Message;
-import java.util.Map;
-
public class TubeUtils {
/**
@@ -53,28 +54,24 @@ public class TubeUtils {
*
* @param topicName the topic name of message
* @param event the DataProxy event
- * @param addExtraAttrs whether to add extra attributes
* @return the message object
*/
- public static Message buildMessage(String topicName,
- Event event, boolean addExtraAttrs) {
+ public static Message buildMessage(String topicName, Event event) {
+ Map<String, String> headers = event.getHeaders();
Message message = new Message(topicName, event.getBody());
- message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
- String streamId = "";
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
+ String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
+ if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+ String streamId = headers.get(Constants.INLONG_STREAM_ID);
+ message.putSystemHeader(streamId,
+ headers.get(ConfigConstants.PKG_TIME_KEY));
+ } else {
+ long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
+ message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
+ DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
}
- message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
- if (addExtraAttrs) {
- // common attributes
- Map<String, String> headers = event.getHeaders();
- message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
- message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
- message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
- message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
- message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+ Map<String, String> extraAttrMap = MessageUtils.getXfsAttrs(headers, pkgVersion);
+ for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
+ message.setAttrKeyVal(entry.getKey(), entry.getValue());
}
return message;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 61a146b25..6e92cb6ba 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -57,6 +57,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -412,6 +413,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY,
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
+ headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
headers.put(AttributeConstants.RCV_TIME,
commonAttrMap.get(AttributeConstants.RCV_TIME));
// add extra key-value information
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 71372da18..1e2d54405 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -59,6 +59,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -463,8 +464,9 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
headers.put(Constants.TOPIC, proxyMessage.getTopic());
- //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
+ headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME));
headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP));
+ headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V1.getName());
Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
return event;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
new file mode 100644
index 000000000..626398df6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+public enum InLongMsgVer {
+ INLONG_V0(0, "V0", "The inlong-msg V0 format"),
+ INLONG_V1(1, "V1", "The inlong-msg V1 format");
+
+ InLongMsgVer(int id, String name, String desc) {
+ this.id = id;
+ this.name = name;
+ this.desc = desc;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public static InLongMsgVer valueOf(int value) {
+ for (InLongMsgVer inLongMsgVer : InLongMsgVer.values()) {
+ if (inLongMsgVer.getId() == value) {
+ return inLongMsgVer;
+ }
+ }
+ return INLONG_V0;
+ }
+
+ private final int id;
+ private final String name;
+ private final String desc;
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index 4e9fd2395..d8835658d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,10 +18,13 @@ package org.apache.inlong.dataproxy.utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
+import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source.MsgType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,4 +110,35 @@ public class MessageUtils {
return topic;
}
+ public static Map<String, String> getXfsAttrs(Map<String, String> headers, String pkgVersion) {
+ // common attributes
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
+ if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+ attrs.put("dataproxyip", NetworkUtils.getLocalIp());
+ attrs.put(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
+ attrs.put(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
+ attrs.put(Constants.TOPIC, headers.get(Constants.TOPIC));
+ attrs.put(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
+ attrs.put(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+ } else {
+ //
+ attrs.put(Constants.INLONG_GROUP_ID,
+ headers.get(AttributeConstants.GROUP_ID));
+ attrs.put(Constants.INLONG_STREAM_ID,
+ headers.get(AttributeConstants.STREAM_ID));
+ attrs.put(Constants.TOPIC,
+ headers.get(ConfigConstants.TOPIC_KEY));
+ attrs.put(Constants.HEADER_KEY_MSG_TIME,
+ headers.get(AttributeConstants.DATA_TIME));
+ attrs.put(Constants.HEADER_KEY_SOURCE_IP,
+ headers.get(ConfigConstants.REMOTE_IP_KEY));
+ attrs.put(Constants.HEADER_KEY_SOURCE_TIME,
+ headers.get(AttributeConstants.RCV_TIME));
+ attrs.put(ConfigConstants.DATAPROXY_IP_KEY,
+ NetworkUtils.getLocalIp());
+ }
+ return attrs;
+ }
+
}