You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/29 09:07:20 UTC

[inlong] branch master updated: [INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e631e2cd [INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
4e631e2cd is described below

commit 4e631e2cda992e7806f115a755a593121195d39c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Jul 29 17:07:15 2022 +0800

    [INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
---
 .../org/apache/inlong/dataproxy/sink/SinkContext.java |  8 ++++----
 .../sink/kafkazone/KafkaZoneSinkContext.java          | 19 +++----------------
 .../sink/pulsarzone/PulsarZoneSinkContext.java        | 13 -------------
 .../dataproxy/sink/tubezone/TubeZoneSinkContext.java  | 19 +++----------------
 4 files changed, 10 insertions(+), 49 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
index dbf823bd8..03c9c6100 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
@@ -40,7 +40,7 @@ public class SinkContext {
     public static final String KEY_PROCESS_INTERVAL = "processInterval";
     public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
 
-    protected final String clusterId;
+    protected final String proxyClusterId;
     protected final String sinkName;
     protected final Context sinkContext;
 
@@ -60,7 +60,7 @@ public class SinkContext {
         this.sinkName = sinkName;
         this.sinkContext = context;
         this.channel = channel;
-        this.clusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+        this.proxyClusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
@@ -117,8 +117,8 @@ public class SinkContext {
      *
      * @return the clusterId
      */
-    public String getClusterId() {
-        return clusterId;
+    public String getProxyClusterId() {
+        return proxyClusterId;
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index f3985e8dd..f94bfd6aa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.kafkazone;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class KafkaZoneSinkContext extends SinkContext {
 
     private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
 
-    private final String proxyClusterId;
     private final String nodeId;
     private final Context producerContext;
     //
@@ -63,8 +61,6 @@ public class KafkaZoneSinkContext extends SinkContext {
             LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
         super(sinkName, context, channel);
         this.dispatchQueue = dispatchQueue;
-        // proxyClusterId
-        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
         // nodeId
         this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
         // compressionType
@@ -101,15 +97,6 @@ public class KafkaZoneSinkContext extends SinkContext {
         this.cacheHolder.close();
     }
 
-    /**
-     * get proxyClusterId
-     * 
-     * @return the proxyClusterId
-     */
-    public String getProxyClusterId() {
-        return proxyClusterId;
-    }
-
     /**
      * get dispatchQueue
      * 
@@ -172,7 +159,7 @@ public class KafkaZoneSinkContext extends SinkContext {
      */
     public void addSendingMetric(DispatchProfile currentRecord, String bid) {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         // metric
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class KafkaZoneSinkContext extends SinkContext {
      */
     public void addSendFailMetric() {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         long msgTime = System.currentTimeMillis();
         long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class KafkaZoneSinkContext extends SinkContext {
      */
     public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         // metric
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index 8f5b22401..51cb7437a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.pulsarzone;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class PulsarZoneSinkContext extends SinkContext {
 
     private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
 
-    private final String proxyClusterId;
     private final String nodeId;
     private final Context producerContext;
     //
@@ -63,8 +61,6 @@ public class PulsarZoneSinkContext extends SinkContext {
             LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
         super(sinkName, context, channel);
         this.dispatchQueue = dispatchQueue;
-        // proxyClusterId
-        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
         // nodeId
         this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
         // compressionType
@@ -101,15 +97,6 @@ public class PulsarZoneSinkContext extends SinkContext {
         this.cacheHolder.close();
     }
 
-    /**
-     * get proxyClusterId
-     * 
-     * @return the proxyClusterId
-     */
-    public String getProxyClusterId() {
-        return proxyClusterId;
-    }
-
     /**
      * get dispatchQueue
      * 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index f56cf3c81..9e94eae59 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.tubezone;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class TubeZoneSinkContext extends SinkContext {
 
     private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
 
-    private final String proxyClusterId;
     private final String nodeId;
     private final Context producerContext;
     //
@@ -63,8 +61,6 @@ public class TubeZoneSinkContext extends SinkContext {
             LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
         super(sinkName, context, channel);
         this.dispatchQueue = dispatchQueue;
-        // proxyClusterId
-        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
         // nodeId
         this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
         // compressionType
@@ -101,15 +97,6 @@ public class TubeZoneSinkContext extends SinkContext {
         this.cacheHolder.close();
     }
 
-    /**
-     * get proxyClusterId
-     * 
-     * @return the proxyClusterId
-     */
-    public String getProxyClusterId() {
-        return proxyClusterId;
-    }
-
     /**
      * get dispatchQueue
      * 
@@ -172,7 +159,7 @@ public class TubeZoneSinkContext extends SinkContext {
      */
     public void addSendMetric(DispatchProfile currentRecord, String bid) {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         // metric
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class TubeZoneSinkContext extends SinkContext {
      */
     public void addSendFailMetric() {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         long msgTime = System.currentTimeMillis();
         long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class TubeZoneSinkContext extends SinkContext {
      */
     public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         // metric
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());