You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/23 10:13:53 UTC
[incubator-inlong] branch master updated: [INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2057)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe9c63 [INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2057)
3fe9c63 is described below
commit 3fe9c634dc0f872f5b4ee5c314228e37b28bf0cd
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 23 18:13:48 2021 +0800
[INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2057)
---
.../dataproxy/config/holder/CommonPropertiesHolder.java | 15 +++++++++++++++
.../inlong/dataproxy/metrics/DataProxyMetricItem.java | 3 +++
.../sink/pulsar/federation/PulsarFederationWorker.java | 6 ++++++
.../sink/pulsar/federation/PulsarProducerCluster.java | 6 ++++--
.../inlong/dataproxy/source/ServerMessageHandler.java | 4 ++++
5 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 6d9aa95..adbaba0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public class CommonPropertiesHolder {
private static Map<String, String> props;
+ private static long auditFormatInterval = 60000L;
+
/**
* init
*/
@@ -55,6 +58,8 @@ public class CommonPropertiesHolder {
CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
props.putAll(loader.load());
LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+ auditFormatInterval = NumberUtils
+ .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
}
} catch (Throwable t) {
LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -113,4 +118,14 @@ public class CommonPropertiesHolder {
value = (value != null) ? value : props.getOrDefault(key, defaultValue);
return value;
}
+
+ /**
+ * getAuditFormatInterval
+ *
+ * @return
+ */
+ public static long getAuditFormatInterval() {
+ return auditFormatInterval;
+ }
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
index fd75537..e11cbd4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -41,6 +41,7 @@ public class DataProxyMetricItem extends MetricItem {
public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
public static final String KEY_SINK_ID = "sinkId";
public static final String KEY_SINK_DATA_ID = "sinkDataId";
+ public static final String KEY_MESSAGE_TIME = "msgTime";
//
public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
@@ -71,6 +72,8 @@ public class DataProxyMetricItem extends MetricItem {
public String sinkId;
@Dimension
public String sinkDataId;
+ @Dimension
+ public String msgTime = String.valueOf(0);
@CountMetric
public AtomicLong readSuccessCount = new AtomicLong(0);
@CountMetric
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
index 914cff3..e3fed03 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
@@ -20,8 +20,10 @@ package org.apache.inlong.dataproxy.sink.pulsar.federation;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -101,6 +103,10 @@ public class PulsarFederationWorker extends Thread {
DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
currentRecord.getHeaders().get(Constants.TOPIC));
+ long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+ System.currentTimeMillis());
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
metricItem.sendCount.incrementAndGet();
metricItem.sendSize.addAndGet(currentRecord.getBody().length);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index 4c82144..6b8457f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -29,6 +29,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -269,14 +270,15 @@ public class PulsarProducerCluster implements LifecycleAware {
DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
+ long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), sendTime);
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
- long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- sendTime);
long sinkDuration = currentTime - sendTime;
long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
long wholeDuration = currentTime - msgTime;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 3e05d92..1f6d87d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -42,6 +42,7 @@ import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.msg.TDMsg1;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
@@ -688,6 +689,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
if (result) {
metricItem.readSuccessCount.incrementAndGet();