You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/23 10:13:53 UTC

[incubator-inlong] branch master updated: [INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2057)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe9c63  [INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level.  (#2057)
3fe9c63 is described below

commit 3fe9c634dc0f872f5b4ee5c314228e37b28bf0cd
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 23 18:13:48 2021 +0800

    [INLONG-2056] The metric of DataProxy append a dimension(minute level) of event time, supporting audit reconciliation of minute level.  (#2057)
---
 .../dataproxy/config/holder/CommonPropertiesHolder.java   | 15 +++++++++++++++
 .../inlong/dataproxy/metrics/DataProxyMetricItem.java     |  3 +++
 .../sink/pulsar/federation/PulsarFederationWorker.java    |  6 ++++++
 .../sink/pulsar/federation/PulsarProducerCluster.java     |  6 ++++--
 .../inlong/dataproxy/source/ServerMessageHandler.java     |  4 ++++
 5 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 6d9aa95..adbaba0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
 import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,8 @@ public class CommonPropertiesHolder {
 
     private static Map<String, String> props;
 
+    private static long auditFormatInterval = 60000L;
+
     /**
      * init
      */
@@ -55,6 +58,8 @@ public class CommonPropertiesHolder {
                         CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
                         props.putAll(loader.load());
                         LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+                        auditFormatInterval = NumberUtils
+                                .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
                     }
                 } catch (Throwable t) {
                     LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -113,4 +118,14 @@ public class CommonPropertiesHolder {
         value = (value != null) ? value : props.getOrDefault(key, defaultValue);
         return value;
     }
+
+    /**
+     * getAuditFormatInterval
+     * 
+     * @return
+     */
+    public static long getAuditFormatInterval() {
+        return auditFormatInterval;
+    }
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
index fd75537..e11cbd4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -41,6 +41,7 @@ public class DataProxyMetricItem extends MetricItem {
     public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
     public static final String KEY_SINK_ID = "sinkId";
     public static final String KEY_SINK_DATA_ID = "sinkDataId";
+    public static final String KEY_MESSAGE_TIME = "msgTime";
     //
     public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
     public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
@@ -71,6 +72,8 @@ public class DataProxyMetricItem extends MetricItem {
     public String sinkId;
     @Dimension
     public String sinkDataId;
+    @Dimension
+    public String msgTime = String.valueOf(0);
     @CountMetric
     public AtomicLong readSuccessCount = new AtomicLong(0);
     @CountMetric
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
index 914cff3..e3fed03 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
@@ -20,8 +20,10 @@ package org.apache.inlong.dataproxy.sink.pulsar.federation;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Event;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
@@ -101,6 +103,10 @@ public class PulsarFederationWorker extends Thread {
                 DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
                 this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
                         currentRecord.getHeaders().get(Constants.TOPIC));
+                long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+                        System.currentTimeMillis());
+                long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+                dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
                 DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
                 metricItem.sendCount.incrementAndGet();
                 metricItem.sendSize.addAndGet(currentRecord.getBody().length);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index 4c82144..6b8457f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -29,6 +29,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
@@ -269,14 +270,15 @@ public class PulsarProducerCluster implements LifecycleAware {
         DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
+        long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), sendTime);
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
         if (result) {
             metricItem.sendSuccessCount.incrementAndGet();
             metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
             if (sendTime > 0) {
                 long currentTime = System.currentTimeMillis();
-                long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                        sendTime);
                 long sinkDuration = currentTime - sendTime;
                 long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
                 long wholeDuration = currentTime - msgTime;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 3e05d92..1f6d87d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -42,6 +42,7 @@ import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.commons.msg.TDMsg1;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.ErrorCode;
@@ -688,6 +689,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
         dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
         dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+        long msgTime = System.currentTimeMillis();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
         if (result) {
             metricItem.readSuccessCount.incrementAndGet();