You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/29 09:07:20 UTC
[inlong] branch master updated: [INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4e631e2cd [INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
4e631e2cd is described below
commit 4e631e2cda992e7806f115a755a593121195d39c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Jul 29 17:07:15 2022 +0800
[INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
---
.../org/apache/inlong/dataproxy/sink/SinkContext.java | 8 ++++----
.../sink/kafkazone/KafkaZoneSinkContext.java | 19 +++----------------
.../sink/pulsarzone/PulsarZoneSinkContext.java | 13 -------------
.../dataproxy/sink/tubezone/TubeZoneSinkContext.java | 19 +++----------------
4 files changed, 10 insertions(+), 49 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
index dbf823bd8..03c9c6100 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
@@ -40,7 +40,7 @@ public class SinkContext {
public static final String KEY_PROCESS_INTERVAL = "processInterval";
public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
- protected final String clusterId;
+ protected final String proxyClusterId;
protected final String sinkName;
protected final Context sinkContext;
@@ -60,7 +60,7 @@ public class SinkContext {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+ this.proxyClusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
@@ -117,8 +117,8 @@ public class SinkContext {
*
* @return the clusterId
*/
- public String getClusterId() {
- return clusterId;
+ public String getProxyClusterId() {
+ return proxyClusterId;
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index f3985e8dd..f94bfd6aa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.kafkazone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class KafkaZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class KafkaZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class KafkaZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
@@ -172,7 +159,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendingMetric(DispatchProfile currentRecord, String bid) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendFailMetric() {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
long msgTime = System.currentTimeMillis();
long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index 8f5b22401..51cb7437a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.pulsarzone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class PulsarZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class PulsarZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class PulsarZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index f56cf3c81..9e94eae59 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.tubezone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class TubeZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class TubeZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class TubeZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
@@ -172,7 +159,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendMetric(DispatchProfile currentRecord, String bid) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendFailMetric() {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
long msgTime = System.currentTimeMillis();
long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendResultMetric(DispatchProfile currentRecord, String bid, boolean result, long sendTime) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());