You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/23 10:13:39 UTC

[incubator-inlong] branch master updated: [INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c874aa9  [INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)
c874aa9 is described below

commit c874aa9048fe2ee9040aa1c6fbcbf336c9a085a0
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 23 18:13:34 2021 +0800

    [INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)
---
 .../standalone/config/holder/CommonPropertiesHolder.java  | 15 ++++++++++++++-
 .../inlong/sort/standalone/metrics/SortMetricItem.java    |  3 +++
 .../standalone/sink/pulsar/PulsarFederationWorker.java    |  6 ++++++
 .../standalone/sink/pulsar/PulsarProducerCluster.java     |  6 ++++--
 4 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 6006ccf..265fd04 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.sort.standalone.config.loader.ClassResourceCommonPropertiesLoader;
 import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
-import org.slf4j.Logger;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
 
 /**
  * 
@@ -42,6 +42,8 @@ public class CommonPropertiesHolder {
     private static Map<String, String> props;
     private static Context context;
 
+    private static long auditFormatInterval = 60000L;
+
     /**
      * init
      */
@@ -58,6 +60,8 @@ public class CommonPropertiesHolder {
                         CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
                         props.putAll(loader.load());
                         LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+                        auditFormatInterval = NumberUtils
+                                .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
                     }
                 } catch (Throwable t) {
                     LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -161,4 +165,13 @@ public class CommonPropertiesHolder {
     public static String getClusterId() {
         return getString(KEY_CLUSTER_ID);
     }
+
+    /**
+     * getAuditFormatInterval
+     * 
+     * @return
+     */
+    public static long getAuditFormatInterval() {
+        return auditFormatInterval;
+    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index 6120a42..ae0cf9f 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -42,6 +42,7 @@ public class SortMetricItem extends MetricItem {
     public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
     public static final String KEY_SINK_ID = "sinkId";// sortDestinationId
     public static final String KEY_SINK_DATA_ID = "sinkDataId";// topic or dest ip
+    public static final String KEY_MESSAGE_TIME = "msgTime";
     //
     public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
     public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
@@ -74,6 +75,8 @@ public class SortMetricItem extends MetricItem {
     public String sinkId;
     @Dimension
     public String sinkDataId;
+    @Dimension
+    public String msgTime = String.valueOf(0);
     @CountMetric
     public AtomicLong readSuccessCount = new AtomicLong(0);
     @CountMetric
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
index bf2a836..ddd5c5b 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -25,10 +25,12 @@ import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 
 /**
@@ -107,6 +109,10 @@ public class PulsarFederationWorker extends Thread {
                 // metric
                 SortMetricItem.fillInlongId(event, dimensions);
                 this.dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(Constants.TOPIC));
+                long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+                        System.currentTimeMillis());
+                long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+                dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
                 SortMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
                 metricItem.sendCount.incrementAndGet();
                 metricItem.sendSize.addAndGet(event.getBody().length);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 1e7a5e6..355abf1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -29,6 +29,7 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.utils.Constants;
@@ -265,14 +266,15 @@ public class PulsarProducerCluster implements LifecycleAware {
         SortMetricItem.fillInlongId(currentRecord, dimensions);
         dimensions.put(SortMetricItem.KEY_SINK_ID, this.cacheClusterName);
         dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
+        long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), sendTime);
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         SortMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
         if (result) {
             metricItem.sendSuccessCount.incrementAndGet();
             metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
             if (sendTime > 0) {
                 long currentTime = System.currentTimeMillis();
-                long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                        sendTime);
                 long sinkDuration = currentTime - sendTime;
                 long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
                 long wholeDuration = currentTime - msgTime;