You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/23 10:13:39 UTC
[incubator-inlong] branch master updated: [INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c874aa9 [INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)
c874aa9 is described below
commit c874aa9048fe2ee9040aa1c6fbcbf336c9a085a0
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 23 18:13:34 2021 +0800
[INLONG-2058] The metric of Sort-standalone append a dimension(minute level) of event time, supporting audit reconciliation of minute level. (#2059)
---
.../standalone/config/holder/CommonPropertiesHolder.java | 15 ++++++++++++++-
.../inlong/sort/standalone/metrics/SortMetricItem.java | 3 +++
.../standalone/sink/pulsar/PulsarFederationWorker.java | 6 ++++++
.../standalone/sink/pulsar/PulsarProducerCluster.java | 6 ++++--
4 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 6006ccf..265fd04 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.inlong.sort.standalone.config.loader.ClassResourceCommonPropertiesLoader;
import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
-import org.slf4j.Logger;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
/**
*
@@ -42,6 +42,8 @@ public class CommonPropertiesHolder {
private static Map<String, String> props;
private static Context context;
+ private static long auditFormatInterval = 60000L;
+
/**
* init
*/
@@ -58,6 +60,8 @@ public class CommonPropertiesHolder {
CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
props.putAll(loader.load());
LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+ auditFormatInterval = NumberUtils
+ .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
}
} catch (Throwable t) {
LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -161,4 +165,13 @@ public class CommonPropertiesHolder {
public static String getClusterId() {
return getString(KEY_CLUSTER_ID);
}
+
+ /**
+ * getAuditFormatInterval
+ *
+ * @return
+ */
+ public static long getAuditFormatInterval() {
+ return auditFormatInterval;
+ }
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index 6120a42..ae0cf9f 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -42,6 +42,7 @@ public class SortMetricItem extends MetricItem {
public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
public static final String KEY_SINK_ID = "sinkId";// sortDestinationId
public static final String KEY_SINK_DATA_ID = "sinkDataId";// topic or dest ip
+ public static final String KEY_MESSAGE_TIME = "msgTime";
//
public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
@@ -74,6 +75,8 @@ public class SortMetricItem extends MetricItem {
public String sinkId;
@Dimension
public String sinkDataId;
+ @Dimension
+ public String msgTime = String.valueOf(0);
@CountMetric
public AtomicLong readSuccessCount = new AtomicLong(0);
@CountMetric
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
index bf2a836..ddd5c5b 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -25,10 +25,12 @@ import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
/**
@@ -107,6 +109,10 @@ public class PulsarFederationWorker extends Thread {
// metric
SortMetricItem.fillInlongId(event, dimensions);
this.dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(Constants.TOPIC));
+ long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+ System.currentTimeMillis());
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
SortMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
metricItem.sendCount.incrementAndGet();
metricItem.sendSize.addAndGet(event.getBody().length);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 1e7a5e6..355abf1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -29,6 +29,7 @@ import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.Constants;
@@ -265,14 +266,15 @@ public class PulsarProducerCluster implements LifecycleAware {
SortMetricItem.fillInlongId(currentRecord, dimensions);
dimensions.put(SortMetricItem.KEY_SINK_ID, this.cacheClusterName);
dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
+ long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), sendTime);
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
SortMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
- long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- sendTime);
long sinkDuration = currentTime - sendTime;
long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
long wholeDuration = currentTime - msgTime;