You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/07 08:31:58 UTC
[inlong] branch master updated: [INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dc9a06f39 [INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)
dc9a06f39 is described below
commit dc9a06f3940d39332b0fdde842af9b6eb4f31413
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Jul 7 16:31:51 2022 +0800
[INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)
---
.../inlong/agent/plugin/channel/MemoryChannel.java | 54 +++----
.../inlong/agent/plugin/metrics/GlobalMetrics.java | 175 +++++++++++++++++++++
.../agent/plugin/metrics/PluginJmxMetric.java | 4 +-
.../plugin/metrics/PluginPrometheusMetric.java | 50 +++---
.../inlong/agent/plugin/metrics/SinkJmxMetric.java | 2 +-
.../metrics/{SinkMetrics.java => SinkMetric.java} | 2 +-
...theusMetrics.java => SinkPrometheusMetric.java} | 16 +-
.../agent/plugin/metrics/SourceJmxMetric.java | 2 +-
.../{SourceMetrics.java => SourceMetric.java} | 8 +-
...eusMetrics.java => SourcePrometheusMetric.java} | 24 +--
.../inlong/agent/plugin/sinks/AbstractSink.java | 29 +---
.../inlong/agent/plugin/sinks/ConsoleSink.java | 12 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 9 +-
.../inlong/agent/plugin/sinks/SenderManager.java | 109 ++++++-------
.../inlong/agent/plugin/sources/BinlogSource.java | 26 ++-
.../agent/plugin/sources/DatabaseSqlSource.java | 36 ++---
.../inlong/agent/plugin/sources/KafkaSource.java | 25 ++-
.../agent/plugin/sources/TextFileSource.java | 30 ++--
.../plugin/sources/reader/AbstractReader.java | 41 +----
.../agent/plugin/sources/reader/BinlogReader.java | 7 +-
.../agent/plugin/sources/reader/KafkaReader.java | 34 +---
.../agent/plugin/sources/reader/SqlReader.java | 40 +++--
.../plugin/sources/reader/TextFileReader.java | 10 +-
.../agent/plugin/metrics/GlobalMetricsTest.java | 66 ++++++++
.../apache/inlong/agent/plugin/sinks/MockSink.java | 38 ++---
25 files changed, 480 insertions(+), 369 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 56769a30f..583a3c5ae 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -19,13 +19,10 @@ package org.apache.inlong.agent.plugin.channel;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.Channel;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+
/**
* memory channel
*/
@@ -42,48 +41,48 @@ public class MemoryChannel implements Channel {
private static final String MEMORY_CHANNEL_TAG_NAME = "AgentMemoryPlugin";
private static AtomicLong metricsIndex = new AtomicLong(0);
- private final PluginMetric pluginMetricNew;
private LinkedBlockingQueue<Message> queue;
public MemoryChannel() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.pluginMetricNew = new PluginPrometheusMetric(AgentUtils.getUniqId(
- MEMORY_CHANNEL_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.pluginMetricNew = new PluginJmxMetric(AgentUtils.getUniqId(
- MEMORY_CHANNEL_TAG_NAME, metricsIndex.incrementAndGet()));
- }
}
@Override
public void push(Message message) {
+ String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
if (message != null) {
- pluginMetricNew.incReadNum();
+ if (message instanceof ProxyMessage) {
+ groupId = ((ProxyMessage) message).getInlongGroupId();
+ }
+ GlobalMetrics.incReadNum(groupId);
queue.put(message);
- pluginMetricNew.incReadSuccessNum();
+ GlobalMetrics.incReadSuccessNum(groupId);
}
} catch (InterruptedException ex) {
- pluginMetricNew.incReadFailedNum();
+ GlobalMetrics.incReadFailedNum(groupId);
Thread.currentThread().interrupt();
}
}
@Override
public boolean push(Message message, long timeout, TimeUnit unit) {
+ String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
if (message != null) {
- pluginMetricNew.incReadNum();
+ if (message instanceof ProxyMessage) {
+ groupId = ((ProxyMessage) message).getInlongGroupId();
+ }
+ GlobalMetrics.incReadNum(groupId);
boolean result = queue.offer(message, timeout, unit);
if (result) {
- pluginMetricNew.incReadSuccessNum();
+ GlobalMetrics.incReadSuccessNum(groupId);
} else {
- pluginMetricNew.incReadFailedNum();
+ GlobalMetrics.incReadFailedNum(groupId);
}
return result;
}
} catch (InterruptedException ex) {
- pluginMetricNew.incReadFailedNum();
+ GlobalMetrics.incReadFailedNum(groupId);
Thread.currentThread().interrupt();
}
return false;
@@ -91,14 +90,18 @@ public class MemoryChannel implements Channel {
@Override
public Message pull(long timeout, TimeUnit unit) {
+ String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
Message message = queue.poll(timeout, unit);
if (message != null) {
- pluginMetricNew.incSendSuccessNum();
+ if (message instanceof ProxyMessage) {
+ groupId = ((ProxyMessage) message).getInlongGroupId();
+ }
+ GlobalMetrics.incSendSuccessNum(groupId);
}
return message;
} catch (InterruptedException ex) {
- pluginMetricNew.incSendFailedNum();
+ GlobalMetrics.incSendFailedNum(groupId);
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
@@ -116,10 +119,7 @@ public class MemoryChannel implements Channel {
if (queue != null) {
queue.clear();
}
- LOGGER.info("destroy channel, memory channel metric, readNum: {}, readSuccessNum: {}, "
- + "readFailedNum: {}, sendSuccessNum: {}, sendFailedNum: {}",
- pluginMetricNew.getReadNum(), pluginMetricNew.getReadSuccessNum(),
- pluginMetricNew.getReadFailedNum(), pluginMetricNew.getSendSuccessNum(),
- pluginMetricNew.getSendFailedNum());
+ LOGGER.info("destroy channel, show memory channel metric:");
+ GlobalMetrics.showMemoryChannelStatics();
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
new file mode 100644
index 000000000..9bd691e85
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.metrics;
+
+import org.apache.inlong.agent.utils.ConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class GlobalMetrics {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GlobalMetrics.class);
+
+ // key: groupId_streamId
+ private static final ConcurrentHashMap<String, PluginMetric> pluginMetrics = new ConcurrentHashMap<>();
+ // key: sourceType_groupId_streamId
+ private static final ConcurrentHashMap<String, SourceMetric> sourceMetrics = new ConcurrentHashMap<>();
+ // key: sinkType_groupId_streamId
+ private static final ConcurrentHashMap<String, SinkMetric> sinkMetrics = new ConcurrentHashMap<>();
+
+ private static PluginMetric getPluginMetric(String tagName) {
+ return pluginMetrics.computeIfAbsent(tagName, (key) -> addPluginMetric(tagName));
+ }
+
+ private static PluginMetric addPluginMetric(String tagName) {
+ PluginMetric metric;
+ if (ConfigUtil.isPrometheusEnabled()) {
+ metric = new PluginPrometheusMetric(tagName);
+ } else {
+ metric = new PluginJmxMetric(tagName);
+ }
+ LOGGER.info("add {} pluginMetrics", tagName);
+ return metric;
+ }
+
+ private static SourceMetric getSourceMetric(String tagName) {
+ return sourceMetrics.computeIfAbsent(tagName, (key) -> addSourceMetric(tagName));
+ }
+
+ private static SourceMetric addSourceMetric(String tagName) {
+ SourceMetric metric;
+ if (ConfigUtil.isPrometheusEnabled()) {
+ metric = new SourcePrometheusMetric(tagName);
+ } else {
+ metric = new SourceJmxMetric(tagName);
+ }
+ LOGGER.info("add {} sourceMetric", tagName);
+ return metric;
+ }
+
+ private static SinkMetric getSinkMetric(String tagName) {
+ return sinkMetrics.computeIfAbsent(tagName, (key) -> addSinkMetric(tagName));
+ }
+
+ private static SinkMetric addSinkMetric(String tagName) {
+ SinkMetric metric;
+ if (ConfigUtil.isPrometheusEnabled()) {
+ metric = new SinkPrometheusMetric(tagName);
+ } else {
+ metric = new SinkJmxMetric(tagName);
+ }
+ LOGGER.info("add {} sinkMetric", tagName);
+ return metric;
+ }
+
+ public static void incReadNum(String tagName) {
+ getPluginMetric(tagName).incReadNum();
+ }
+
+ public static long getReadNum(String tagName) {
+ return getPluginMetric(tagName).getReadNum();
+ }
+
+ public static void incSendNum(String tagName) {
+ getPluginMetric(tagName).incSendNum();
+ }
+
+ public static long getSendNum(String tagName) {
+ return getPluginMetric(tagName).getReadNum();
+ }
+
+ public static void incReadFailedNum(String tagName) {
+ getPluginMetric(tagName).incReadFailedNum();
+ }
+
+ public static long getReadFailedNum(String tagName) {
+ return getPluginMetric(tagName).getReadFailedNum();
+ }
+
+ public static void incSendFailedNum(String tagName) {
+ getPluginMetric(tagName).incSendFailedNum();
+ }
+
+ public static long getSendFailedNum(String tagName) {
+ return getPluginMetric(tagName).getSendFailedNum();
+ }
+
+ public static void incReadSuccessNum(String tagName) {
+ getPluginMetric(tagName).incReadSuccessNum();
+ }
+
+ public static long getReadSuccessNum(String tagName) {
+ return getPluginMetric(tagName).getReadSuccessNum();
+ }
+
+ public static void incSendSuccessNum(String tagName) {
+ getPluginMetric(tagName).incSendSuccessNum();
+ }
+
+ public static void incSendSuccessNum(String tagName, int delta) {
+ getPluginMetric(tagName).incSendSuccessNum(delta);
+ }
+
+ public static long getSendSuccessNum(String tagName) {
+ return getPluginMetric(tagName).getSendSuccessNum();
+ }
+
+ public static void incSinkSuccessCount(String tagName) {
+ getSinkMetric(tagName).incSinkSuccessCount();
+ }
+
+ public static long getSinkSuccessCount(String tagName) {
+ return getSinkMetric(tagName).getSinkSuccessCount();
+ }
+
+ public static void incSinkFailCount(String tagName) {
+ getSinkMetric(tagName).incSinkFailCount();
+ }
+
+ public static long getSinkFailCount(String tagName) {
+ return getSinkMetric(tagName).getSinkFailCount();
+ }
+
+ public static void incSourceSuccessCount(String tagName) {
+ getSourceMetric(tagName).incSourceSuccessCount();
+ }
+
+ public static long getSourceSuccessCount(String tagName) {
+ return getSourceMetric(tagName).getSourceSuccessCount();
+ }
+
+ public static void incSourceFailCount(String tagName) {
+ getSourceMetric(tagName).incSourceFailCount();
+ }
+
+ public static void showMemoryChannelStatics() {
+ for (Entry<String, PluginMetric> entry : pluginMetrics.entrySet()) {
+ LOGGER.info("tagName:{} ### readNum: {}, readSuccessNum: {}, readFailedNum: {}, sendSuccessNum: {}, "
+ + "sendFailedNum: {}", entry.getKey(), entry.getValue().getReadNum(),
+ entry.getValue().getReadSuccessNum(), entry.getValue().getReadFailedNum(),
+ entry.getValue().getSendSuccessNum(), entry.getValue().getSendFailedNum());
+ }
+ }
+
+ public long getSourceFailCount(String tagName) {
+ return getSourceMetric(tagName).getSourceFailCount();
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
index 5e75c3a2a..93c7e8e85 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
@@ -19,14 +19,14 @@
package org.apache.inlong.agent.plugin.metrics;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.inlong.agent.metrics.Metric;
import org.apache.inlong.common.metric.Dimension;
import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItem;
import org.apache.inlong.common.metric.MetricRegister;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* metrics for agent plugin
*/
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
index 7b931712f..2f1f90236 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
@@ -23,8 +23,8 @@ import io.prometheus.client.Counter;
public class PluginPrometheusMetric implements PluginMetric {
+ // agent-metrics
public static final String AGENT_PLUGIN_METRICS_PREFIX = "inlong_agent_plugin_";
-
public static final String READ_NUM_COUNTER_NAME = "read_num_count";
public static final String SEND_NUM_COUNTER_NAME = "send_num_count";
public static final String READ_FAILED_NUM_COUNTER_NAME = "read_failed_num_count";
@@ -32,44 +32,40 @@ public class PluginPrometheusMetric implements PluginMetric {
public static final String READ_SUCCESS_NUM_COUNTER_NAME = "read_success_num_count";
public static final String SEND_SUCCESS_NUM_COUNTER_NAME = "send_success_num_count";
- private final String tagName;
-
- private static final Counter READ_NUM_COUNTER = Counter.build()
+ // agent-counters
+ private final Counter readNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + READ_NUM_COUNTER_NAME)
.help("The total number of reads.")
.labelNames("tag")
.register();
-
- private static final Counter SEND_NUM_COUNTER = Counter.build()
+ private final Counter sendNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + SEND_NUM_COUNTER_NAME)
.help("The total number of sends.")
.labelNames("tag")
.register();
-
- private static final Counter READ_FAILED_NUM_COUNTER = Counter.build()
+ private final Counter readFailedNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + READ_FAILED_NUM_COUNTER_NAME)
.help("The total number of failed reads.")
.labelNames("tag")
.register();
-
- private static final Counter SEND_FAILED_NUM_COUNTER = Counter.build()
+ private final Counter sendFailedNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + SEND_FAILED_NUM_COUNTER_NAME)
.help("The total number of failed sends.")
.labelNames("tag")
.register();
-
- private static final Counter READ_SUCCESS_NUM_COUNTER = Counter.build()
+ private final Counter readSuccessNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + READ_SUCCESS_NUM_COUNTER_NAME)
.help("The total number of successful reads.")
.labelNames("tag")
.register();
-
- private static final Counter SEND_SUCCESS_NUM_COUNTER = Counter.build()
+ private final Counter sendSuccessNumCounter = Counter.build()
.name(AGENT_PLUGIN_METRICS_PREFIX + SEND_SUCCESS_NUM_COUNTER_NAME)
.help("The total number of successful sends.")
.labelNames("tag")
.register();
+ private String tagName;
+
public PluginPrometheusMetric(String tagName) {
this.tagName = tagName;
}
@@ -81,66 +77,66 @@ public class PluginPrometheusMetric implements PluginMetric {
@Override
public void incReadNum() {
- READ_NUM_COUNTER.labels(tagName).inc();
+ readNumCounter.labels(tagName).inc();
}
@Override
public long getReadNum() {
- return (long) READ_NUM_COUNTER.labels(tagName).get();
+ return (long) readNumCounter.labels(tagName).get();
}
@Override
public void incSendNum() {
- SEND_NUM_COUNTER.labels(tagName).inc();
+ sendNumCounter.labels(tagName).inc();
}
@Override
public long getSendNum() {
- return (long) SEND_NUM_COUNTER.labels(tagName).get();
+ return (long) sendNumCounter.labels(tagName).get();
}
@Override
public void incReadFailedNum() {
- READ_FAILED_NUM_COUNTER.labels(tagName).inc();
+ readFailedNumCounter.labels(tagName).inc();
}
@Override
public long getReadFailedNum() {
- return (long) READ_FAILED_NUM_COUNTER.labels(tagName).get();
+ return (long) readFailedNumCounter.labels(tagName).get();
}
@Override
public void incSendFailedNum() {
- SEND_FAILED_NUM_COUNTER.labels(tagName).inc();
+ sendFailedNumCounter.labels(tagName).inc();
}
@Override
public long getSendFailedNum() {
- return (long) SEND_FAILED_NUM_COUNTER.labels(tagName).get();
+ return (long) sendFailedNumCounter.labels(tagName).get();
}
@Override
public void incReadSuccessNum() {
- READ_SUCCESS_NUM_COUNTER.labels(tagName).inc();
+ readSuccessNumCounter.labels(tagName).inc();
}
@Override
public long getReadSuccessNum() {
- return (long) READ_SUCCESS_NUM_COUNTER.labels(tagName).get();
+ return (long) readSuccessNumCounter.labels(tagName).get();
}
@Override
public void incSendSuccessNum() {
- SEND_SUCCESS_NUM_COUNTER.labels(tagName).inc();
+ sendSuccessNumCounter.labels(tagName).inc();
}
@Override
public void incSendSuccessNum(int delta) {
- SEND_SUCCESS_NUM_COUNTER.labels(tagName).inc(delta);
+ sendSuccessNumCounter.labels(tagName).inc(delta);
}
@Override
public long getSendSuccessNum() {
- return (long) SEND_SUCCESS_NUM_COUNTER.labels(tagName).get();
+ return (long) sendSuccessNumCounter.labels(tagName).get();
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
index 612eeadd7..b4cf365e0 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import java.util.concurrent.atomic.AtomicLong;
@MetricDomain(name = "SinkMetric")
-public class SinkJmxMetric extends MetricItem implements SinkMetrics {
+public class SinkJmxMetric extends MetricItem implements SinkMetric {
@Dimension
private final String tagName;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
similarity index 97%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
index 7bf325644..a9e6d92cb 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
@@ -19,7 +19,7 @@
package org.apache.inlong.agent.plugin.metrics;
-public interface SinkMetrics {
+public interface SinkMetric {
/**
* The tag name of sink metrics.
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
similarity index 80%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
index 61482f6ef..16fe4f6d2 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
@@ -21,7 +21,7 @@ package org.apache.inlong.agent.plugin.metrics;
import io.prometheus.client.Counter;
-public class SinkPrometheusMetrics implements SinkMetrics {
+public class SinkPrometheusMetric implements SinkMetric {
public static final String AGENT_SINK_METRICS_PREFIX = "inlong_agent_sink_";
@@ -30,19 +30,19 @@ public class SinkPrometheusMetrics implements SinkMetrics {
private final String tagName;
- private static final Counter SINK_SUCCESS_COUNTER = Counter.build()
+ private final Counter sinkSuccessCounter = Counter.build()
.name(AGENT_SINK_METRICS_PREFIX + SINK_SUCCESS_COUNTER_NAME)
.help("The success message count in agent sink since agent started.")
.labelNames("tag")
.register();
- private static final Counter SINK_FAIL_COUNTER = Counter.build()
+ private final Counter sinkFailCounter = Counter.build()
.name(AGENT_SINK_METRICS_PREFIX + SINK_FAIL_COUNTER_NAME)
.help("The failed message count in agent sink since agent started.")
.labelNames("tag")
.register();
- public SinkPrometheusMetrics(String tagName) {
+ public SinkPrometheusMetric(String tagName) {
this.tagName = tagName;
}
@@ -53,21 +53,21 @@ public class SinkPrometheusMetrics implements SinkMetrics {
@Override
public void incSinkSuccessCount() {
- SINK_SUCCESS_COUNTER.labels(tagName).inc();
+ sinkSuccessCounter.labels(tagName).inc();
}
@Override
public long getSinkSuccessCount() {
- return (long) SINK_SUCCESS_COUNTER.labels(tagName).get();
+ return (long) sinkSuccessCounter.labels(tagName).get();
}
@Override
public void incSinkFailCount() {
- SINK_FAIL_COUNTER.labels(tagName).inc();
+ sinkFailCounter.labels(tagName).inc();
}
@Override
public long getSinkFailCount() {
- return (long) SINK_FAIL_COUNTER.labels(tagName).get();
+ return (long) sinkFailCounter.labels(tagName).get();
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
index 3ab89452d..2f551f877 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import java.util.concurrent.atomic.AtomicLong;
@MetricDomain(name = "SourceMetric")
-public class SourceJmxMetric extends MetricItem implements SourceMetrics {
+public class SourceJmxMetric extends MetricItem implements SourceMetric {
@Dimension
private final String tagName;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
similarity index 88%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
index e71bda702..47b24721f 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
@@ -19,10 +19,10 @@
package org.apache.inlong.agent.plugin.metrics;
-public interface SourceMetrics {
+public interface SourceMetric {
/**
- * The tag name of source metrics.
+ * The tag name of sinksource metrics.
*/
String getTagName();
@@ -32,7 +32,7 @@ public interface SourceMetrics {
void incSourceSuccessCount();
/**
- * Count of the source success metric.
+ * Count of the source success metric.
*/
long getSourceSuccessCount();
@@ -42,7 +42,7 @@ public interface SourceMetrics {
void incSourceFailCount();
/**
- * Count of the source fail metric.
+ * Count of the source fail metric.
*/
long getSourceFailCount();
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
similarity index 77%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
index 9fb52d5bd..babad8664 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
@@ -19,28 +19,28 @@ package org.apache.inlong.agent.plugin.metrics;
import io.prometheus.client.Counter;
-public class SourcePrometheusMetrics implements SourceMetrics {
+public class SourcePrometheusMetric implements SourceMetric {
+ // agent-source-metrics
public static final String AGENT_SOURCE_METRICS_PREFIX = "inlong_agent_source_";
-
public static final String SOURCE_SUCCESS_COUNTER_NAME = "success_count";
public static final String SOURCE_FAIL_COUNTER_NAME = "fail_count";
- private final String tagName;
-
- private static final Counter SOURCE_SUCCESS_COUNTER = Counter.build()
+ // agent-source-counters
+ private final Counter sourceSuccessCounter = Counter.build()
.name(AGENT_SOURCE_METRICS_PREFIX + SOURCE_SUCCESS_COUNTER_NAME)
.help("The success message count in agent source since agent started.")
.labelNames("tag")
.register();
-
- private static final Counter SOURCE_FAIL_COUNTER = Counter.build()
+ private final Counter sourceFailCounter = Counter.build()
.name(AGENT_SOURCE_METRICS_PREFIX + SOURCE_FAIL_COUNTER_NAME)
.help("The failed message count in agent source since agent started.")
.labelNames("tag")
.register();
- public SourcePrometheusMetrics(String tagName) {
+ private String tagName;
+
+ public SourcePrometheusMetric(String tagName) {
this.tagName = tagName;
}
@@ -51,21 +51,21 @@ public class SourcePrometheusMetrics implements SourceMetrics {
@Override
public void incSourceSuccessCount() {
- SOURCE_SUCCESS_COUNTER.labels(tagName).inc();
+ sourceSuccessCounter.labels(tagName).inc();
}
@Override
public long getSourceSuccessCount() {
- return (long) SOURCE_SUCCESS_COUNTER.labels(tagName).get();
+ return (long) sourceSuccessCounter.labels(tagName).get();
}
@Override
public void incSourceFailCount() {
- SOURCE_FAIL_COUNTER.labels(tagName).inc();
+ sourceFailCounter.labels(tagName).inc();
}
@Override
public long getSourceFailCount() {
- return (long) SOURCE_FAIL_COUNTER.labels(tagName).get();
+ return (long) sourceFailCounter.labels(tagName).get();
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 1d1035a2f..d62ca2d4f 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -17,14 +17,9 @@
package org.apache.inlong.agent.plugin.sinks;
-import com.google.common.base.Joiner;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
-import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,12 +38,10 @@ public abstract class AbstractSink implements Sink {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSink.class);
- protected static SinkMetrics sinkMetric;
-
- protected static SinkMetrics streamMetric;
private static AtomicLong index = new AtomicLong(0);
protected String inlongGroupId;
protected String inlongStreamId;
+ protected String metricTagName;
@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
@@ -69,24 +62,4 @@ public abstract class AbstractSink implements Sink {
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}
- /**
- * init sinkMetric
- *
- * @param tagName metric tagName
- */
- protected void intMetric(String tagName) {
- String label = Joiner.on(",").join(tagName, index.getAndIncrement());
- if (ConfigUtil.isPrometheusEnabled()) {
- sinkMetric = new SinkPrometheusMetrics(label);
- } else {
- sinkMetric = new SinkJmxMetric(label);
- }
- label = Joiner.on(",").join(tagName, inlongGroupId, inlongStreamId);
- if (ConfigUtil.isPrometheusEnabled()) {
- streamMetric = new SinkPrometheusMetrics(label);
- } else {
- streamMetric = new SinkJmxMetric(label);
- }
- }
-
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
index 1471fd677..b6a14a1f3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
@@ -17,10 +17,12 @@
package org.apache.inlong.agent.plugin.sinks;
-import java.nio.charset.StandardCharsets;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
+
+import java.nio.charset.StandardCharsets;
/**
* message write to console
@@ -38,12 +40,10 @@ public class ConsoleSink extends AbstractSink {
if (message != null) {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
// increment the count of successful sinks
- sinkMetric.incSinkSuccessCount();
- streamMetric.incSinkSuccessCount();
+ GlobalMetrics.incSinkSuccessCount(metricTagName);
} else {
// increment the count of failed sinks
- sinkMetric.incSinkFailCount();
- streamMetric.incSinkFailCount();
+ GlobalMetrics.incSinkFailCount(metricTagName);
}
}
@@ -60,7 +60,7 @@ public class ConsoleSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- intMetric(CONSOLE_SINK_TAG_NAME);
+ metricTagName = CONSOLE_SINK_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
}
@Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 97fa7d3d8..b97116ad3 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.message.PackProxyMessage;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -111,12 +112,10 @@ public class ProxySink extends AbstractSink {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
inlongGroupId, inlongStreamId, System.currentTimeMillis());
// increment the count of successful sinks
- sinkMetric.incSinkSuccessCount();
- streamMetric.incSinkSuccessCount();
+ GlobalMetrics.incSinkSuccessCount(metricTagName);
} else {
// increment the count of failed sinks
- sinkMetric.incSinkFailCount();
- streamMetric.incSinkFailCount();
+ GlobalMetrics.incSinkFailCount(metricTagName);
}
}
} catch (Exception e) {
@@ -183,7 +182,7 @@ public class ProxySink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- intMetric(PROXY_SINK_TAG_NAME);
+ metricTagName = PROXY_SINK_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
syncSend = jobConf.getBoolean(PROXY_SEND_SYNC, false);
maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE);
maxQueueNumber = jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index c0183afb6..86ab06bb8 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -23,11 +23,8 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.message.SequentialID;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
@@ -79,11 +76,9 @@ public class SenderManager {
private final int maxSenderRetry;
private final long retrySleepTime;
private final String inlongGroupId;
- private TaskPositionManager taskPositionManager;
private final int maxSenderPerGroup;
private final String sourcePath;
- private final PluginMetric metric;
-
+ private TaskPositionManager taskPositionManager;
private int ioThreadNum;
private boolean enableBusyWait;
private Semaphore semaphore;
@@ -125,12 +120,6 @@ public class SenderManager {
this.sourcePath = sourcePath;
this.inlongGroupId = inlongGroupId;
-
- if (ConfigUtil.isPrometheusEnabled()) {
- this.metric = new PluginPrometheusMetric(SENDER_MANAGER_TAG_NAME);
- } else {
- this.metric = new PluginJmxMetric(SENDER_MANAGER_TAG_NAME);
- }
}
/**
@@ -156,13 +145,13 @@ public class SenderManager {
/**
* sender
*
- * @param groupId group id
+ * @param tagName group id
* @return DefaultMessageSender
*/
- private DefaultMessageSender createMessageSender(String groupId) throws Exception {
+ private DefaultMessageSender createMessageSender(String tagName) throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
- localhost, isLocalVisit, managerHost, managerPort, groupId, netTag);
+ localhost, isLocalVisit, managerHost, managerPort, tagName, netTag);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
@@ -192,50 +181,6 @@ public class SenderManager {
senderList.add(sender);
}
- /**
- * sender callback
- */
- private class AgentSenderCallback implements SendMessageCallback {
-
- private final int retry;
- private final String groupId;
- private final List<byte[]> bodyList;
- private final String streamId;
- private final long dataTime;
- private final String jobId;
-
- AgentSenderCallback(String jobId, String groupId, String streamId, List<byte[]> bodyList, int retry,
- long dataTime) {
- this.retry = retry;
- this.groupId = groupId;
- this.streamId = streamId;
- this.bodyList = bodyList;
- this.jobId = jobId;
- this.dataTime = dataTime;
- }
-
- @Override
- public void onMessageAck(SendResult result) {
- // if send result is not ok, retry again.
- if (result == null || !result.equals(SendResult.OK)) {
- LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, "
- + "error {}", groupId, streamId, jobId, dataTime, retry, result);
- sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
- return;
- }
- semaphore.release(bodyList.size());
- metric.incSendSuccessNum(bodyList.size());
- if (sourcePath != null) {
- taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
- }
- }
-
- @Override
- public void onException(Throwable e) {
- LOGGER.error("exception caught", e);
- }
- }
-
/**
* Send message to proxy by batch, use message cache.
*
@@ -303,4 +248,48 @@ public class SenderManager {
}
}
+ /**
+ * sender callback
+ */
+ private class AgentSenderCallback implements SendMessageCallback {
+
+ private final int retry;
+ private final String groupId;
+ private final List<byte[]> bodyList;
+ private final String streamId;
+ private final long dataTime;
+ private final String jobId;
+
+ AgentSenderCallback(String jobId, String groupId, String streamId, List<byte[]> bodyList, int retry,
+ long dataTime) {
+ this.retry = retry;
+ this.groupId = groupId;
+ this.streamId = streamId;
+ this.bodyList = bodyList;
+ this.jobId = jobId;
+ this.dataTime = dataTime;
+ }
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ // if send result is not ok, retry again.
+ if (result == null || !result.equals(SendResult.OK)) {
+ LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, "
+ + "error {}", groupId, streamId, jobId, dataTime, retry, result);
+ sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
+ return;
+ }
+ semaphore.release(bodyList.size());
+ GlobalMetrics.incSendSuccessNum(groupId + "_" + streamId, bodyList.size());
+ if (sourcePath != null) {
+ taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ LOGGER.error("exception caught", e);
+ }
+ }
+
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
index f13e1f49c..dfb0aba07 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
@@ -20,18 +20,18 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
/**
* binlog source, split binlog source job into multi readers
@@ -40,25 +40,19 @@ public class BinlogSource implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogSource.class);
private static final String BINLOG_SOURCE_TAG_NAME = "BinlogSourceMetric";
- private static AtomicLong metricsIndex = new AtomicLong(0);
- private final SourceMetrics sourceMetrics;
public BinlogSource() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
- BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
- BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- }
}
@Override
public List<Reader> split(JobProfile conf) {
+ String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ String metricTagName = BINLOG_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
Reader binlogReader = new BinlogReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(binlogReader);
- sourceMetrics.incSourceSuccessCount();
+ GlobalMetrics.incSourceSuccessCount(metricTagName);
return readerList;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 1a0bcbda9..ee13ddb71 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -17,43 +17,38 @@
package org.apache.inlong.agent.plugin.sources;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
import org.apache.inlong.agent.utils.AgentDbUtils;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
/**
* Make database as Source
*/
-public class DatabaseSqlSource implements Source {
+public class DatabaseSqlSource implements Source {
+
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseSqlSource.class);
private static final String JOB_DATABASE_SQL = "job.sql.command";
private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";
- private final SourceMetrics sourceMetrics;
private static AtomicLong metricsIndex = new AtomicLong(0);
public DatabaseSqlSource() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
- DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
- DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- }
}
/**
@@ -81,6 +76,9 @@ public class DatabaseSqlSource implements Source {
*/
@Override
public List<Reader> split(JobProfile conf) {
+ String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ String metricTagName = DATABASE_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
String sqlPattern = conf.get(JOB_DATABASE_SQL, "").toLowerCase();
List<Reader> readerList = null;
if (!sqlPattern.isEmpty()) {
@@ -88,11 +86,11 @@ public class DatabaseSqlSource implements Source {
}
if (readerList != null) {
// increment the count of successful sources
- sourceMetrics.incSourceSuccessCount();
+ GlobalMetrics.incSourceSuccessCount(metricTagName);
} else {
// database type or sql is incorrect
// increment the count of failed sources
- sourceMetrics.incSourceFailCount();
+ GlobalMetrics.incSourceFailCount(metricTagName);
}
return readerList;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 87af9c4e6..a817c4e12 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -23,12 +23,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -42,6 +38,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET;
@@ -71,21 +71,15 @@ public class KafkaSource implements Source {
private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
private static final Gson gson = new Gson();
private static AtomicLong metricsIndex = new AtomicLong(0);
- private final SourceMetrics sourceMetrics;
public KafkaSource() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
- KAFKA_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
- KAFKA_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- }
-
}
@Override
public List<Reader> split(JobProfile conf) {
+ String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ String metricTagName = KAFKA_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
List<Reader> result = new ArrayList<>();
String filterPattern = conf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
@@ -141,6 +135,9 @@ public class KafkaSource implements Source {
addValidator(filterPattern, kafkaReader);
result.add(kafkaReader);
}
+ GlobalMetrics.incSourceSuccessCount(metricTagName);
+ } else {
+ GlobalMetrics.incSourceFailCount(metricTagName);
}
return result;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 9baeb352c..d1258685e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -20,13 +20,9 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +32,11 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
@@ -47,28 +47,20 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOU
*/
public class TextFileSource implements Source {
- private static final Logger LOGGER = LoggerFactory.getLogger(TextFileSource.class);
-
- private static final String TEXT_FILE_SOURCE_TAG_NAME = "AgentTextFileSourceMetric";
-
// path + suffix
public static final String MD5_SUFFIX = ".md5";
-
- private final SourceMetrics sourceMetrics;
+ private static final Logger LOGGER = LoggerFactory.getLogger(TextFileSource.class);
+ private static final String TEXT_FILE_SOURCE_TAG_NAME = "AgentTextFileSourceMetric";
private static AtomicLong metricsIndex = new AtomicLong(0);
public TextFileSource() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
- TEXT_FILE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
- TEXT_FILE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
- }
}
@Override
public List<Reader> split(JobProfile jobConf) {
+ String inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ String inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ String metricTagName = TEXT_FILE_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
List<Reader> result = new ArrayList<>();
String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
@@ -84,7 +76,7 @@ public class TextFileSource implements Source {
result.add(textFileReader);
}
// increment the count of successful sources
- sourceMetrics.incSourceSuccessCount();
+ GlobalMetrics.incSourceSuccessCount(metricTagName);
return result;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 7f5566d0e..e9ed0ac62 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -17,35 +17,23 @@
package org.apache.inlong.agent.plugin.sources.reader;
-import com.google.common.base.Joiner;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
-import org.apache.inlong.agent.utils.ConfigUtil;
-
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.KEY_METRICS_INDEX;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
/**
* abstract reader, init reader and reader metrics
*/
public abstract class AbstractReader implements Reader {
- protected static PluginMetric readerMetric;
- protected static PluginMetric streamMetric;
- private static AtomicLong metricsIndex = new AtomicLong(0);
protected String inlongGroupId;
protected String inlongStreamId;
+ protected String metricTagName;
@Override
public void init(JobProfile jobConf) {
@@ -53,30 +41,7 @@ public abstract class AbstractReader implements Reader {
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}
- /**
- * init reader metrics
- *
- * @param tagName metric tagName
- */
- protected void intMetric(String tagName) {
- String metricsIndexValue = String.valueOf(metricsIndex.getAndIncrement());
- String label = Joiner.on(",").join(tagName, metricsIndexValue);
- String groupIdKV = PROXY_KEY_GROUP_ID + "=" + inlongGroupId;
- String streamIdKV = PROXY_KEY_STREAM_ID + "=" + inlongStreamId;
- String metricsIndexKV = KEY_METRICS_INDEX + "=" + metricsIndexValue;
- if (ConfigUtil.isPrometheusEnabled()) {
- readerMetric = new PluginPrometheusMetric(label);
- } else {
- label = Joiner.on(",").join(tagName, metricsIndexKV);
- readerMetric = new PluginJmxMetric(label);
- }
- label = Joiner.on(",").join(tagName, inlongGroupId, inlongStreamId);
- if (ConfigUtil.isPrometheusEnabled()) {
- streamMetric = new PluginPrometheusMetric(label);
- } else {
- label = Joiner.on(",").join(tagName, groupIdKV, streamIdKV);
- streamMetric = new PluginJmxMetric(label);
- }
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
-
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 9ca78b5f7..7b7961a48 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -31,7 +31,7 @@ import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.SnapshotModeConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
@@ -60,7 +60,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
/**
* read binlog data
*/
-public class BinlogReader implements Reader {
+public class BinlogReader extends AbstractReader {
public static final String COMPONENT_NAME = "BinlogReader";
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
@@ -81,6 +81,7 @@ public class BinlogReader implements Reader {
public static final String JOB_DATABASE_QUEUE_SIZE = "job.binlogJob.queueSize";
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
private static final Gson gson = new Gson();
+ private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
/**
* pair.left: table name
@@ -120,6 +121,7 @@ public class BinlogReader implements Reader {
@Override
public Message read() {
if (!binlogMessagesQueue.isEmpty()) {
+ GlobalMetrics.incReadNum(metricTagName);
return getBinlogMessage();
} else {
return null;
@@ -171,6 +173,7 @@ public class BinlogReader implements Reader {
CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
+ metricTagName = BINLOG_READER_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
if (enableReportConfigLog) {
String reportConfigServerUrl = jobConf
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index fe28c79a6..93910e99a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -23,14 +23,10 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -48,10 +44,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_OFFSET;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
@@ -61,7 +53,7 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_TOPIC;
/**
* read kafka data
*/
-public class KafkaReader<K, V> implements Reader {
+public class KafkaReader<K, V> extends AbstractReader {
public static final int NEVER_STOP_SIGN = -1;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReader.class);
@@ -77,7 +69,6 @@ public class KafkaReader<K, V> implements Reader {
/* total readBytes */
private static AtomicLong currentTotalReadBytes = new AtomicLong(0);
private static AtomicLong lastTotalReadBytes = new AtomicLong(0);
- private final PluginMetric kafkaMetric;
KafkaConsumer<K, V> consumer;
long lastTimestamp;
/* bps: records/s */
@@ -103,18 +94,9 @@ public class KafkaReader<K, V> implements Reader {
*/
public KafkaReader(KafkaConsumer<K, V> consumer, Map<String, String> paraMap) {
this.consumer = consumer;
- // metrics total readRecords
- if (ConfigUtil.isPrometheusEnabled()) {
- kafkaMetric = new PluginPrometheusMetric(AgentUtils.getUniqId(
- KAFKA_READER_TAG_NAME, currentTotalReadRecords.incrementAndGet()));
- } else {
- kafkaMetric = new PluginJmxMetric(AgentUtils.getUniqId(
- KAFKA_READER_TAG_NAME, currentTotalReadRecords.incrementAndGet()));
- }
-
- this.recordSpeed = Long.valueOf(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
- this.byteSpeed = Long.valueOf(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT, String.valueOf(1024 * 1024)));
- this.flowControlInterval = Long.valueOf(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
+ this.recordSpeed = Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
+ this.byteSpeed = Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT, String.valueOf(1024 * 1024)));
+ this.flowControlInterval = Long.parseLong(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
this.lastTimestamp = System.currentTimeMillis();
this.topic = paraMap.get(JOB_KAFKA_TOPIC);
@@ -140,7 +122,7 @@ public class KafkaReader<K, V> implements Reader {
"partition:" + record.partition()
+ ", value:" + new String(recordValue) + ", offset:" + record.offset());
// control speed
- kafkaMetric.incReadNum();
+ GlobalMetrics.incReadNum(metricTagName);
// commit succeed,then record current offset
snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
DefaultMessage message = new DefaultMessage(recordValue, headerMap);
@@ -188,13 +170,13 @@ public class KafkaReader<K, V> implements Reader {
@Override
public void init(JobProfile jobConf) {
+ super.init(jobConf);
+ metricTagName = KAFKA_READER_TAG_NAME + "_" + inlongGroupId;
// get offset from jobConf
snapshot = jobConf.get(JOB_KAFKA_OFFSET, null);
initReadTimeout(jobConf);
// fetch data
fetchData(5000);
- inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
- inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}
@Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index a27a50522..33f4f04a9 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -17,21 +17,6 @@
package org.apache.inlong.agent.plugin.sources.reader;
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.VARBINARY;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.CharUtils;
import org.apache.commons.lang3.StringUtils;
@@ -39,11 +24,25 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentDbUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.VARBINARY;
+
/**
* Read data from database by SQL
*/
@@ -119,16 +118,14 @@ public class SqlReader extends AbstractReader {
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId, System.currentTimeMillis());
- readerMetric.incReadNum();
- streamMetric.incReadNum();
+ GlobalMetrics.incReadNum(metricTagName);
return generateMessage(lineColumns);
} else {
finished = true;
}
} catch (Exception ex) {
LOGGER.error("error while reading data", ex);
- readerMetric.incReadFailedNum();
- streamMetric.incReadFailedNum();
+ GlobalMetrics.incReadFailedNum(metricTagName);
throw new RuntimeException(ex);
}
return null;
@@ -193,9 +190,8 @@ public class SqlReader extends AbstractReader {
@Override
public void init(JobProfile jobConf) {
- inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
- inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
- intMetric(SQL_READER_TAG_NAME);
+ super.init(jobConf);
+ metricTagName = SQL_READER_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
String userName = jobConf.get(JOB_DATABASE_USER);
String password = jobConf.get(JOB_DATABASE_PASSWORD);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index a627e7c98..7ec8e8ad1 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -88,10 +89,7 @@ public class TextFileReader extends AbstractReader {
if (validateMessage(message)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId, System.currentTimeMillis());
- if (streamMetric != null) {
- readerMetric.incReadNum();
- streamMetric.incReadNum();
- }
+ GlobalMetrics.incReadNum(metricTagName);
return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
}
}
@@ -166,7 +164,7 @@ public class TextFileReader extends AbstractReader {
public void init(JobProfile jobConf) {
try {
super.init(jobConf);
- intMetric(TEXT_FILE_READER_TAG_NAME);
+ metricTagName = TEXT_FILE_READER_TAG_NAME + "_" + inlongGroupId;
initReadTimeout(jobConf);
String md5 = AgentUtils.getFileMd5(file);
if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(md5)) {
@@ -197,6 +195,6 @@ public class TextFileReader extends AbstractReader {
}
AgentUtils.finallyClose(stream);
LOGGER.info("destroy reader with read {} num {}",
- streamMetric.getTagName(), streamMetric.getReadNum());
+ metricTagName, GlobalMetrics.getReadNum(metricTagName));
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
new file mode 100644
index 000000000..2fa69a7ed
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.metrics;
+
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class GlobalMetricsTest {
+
+ private String groupId1 = "groupId_test1";
+ private String groupId2 = "groupId_test2";
+ private String streamId = "streamId";
+ private String sinkTag = "File_Sink";
+ private String sourceTag = "File_Source";
+
+ @Test
+ public void testPluginMetric() {
+ String tag1 = groupId1 + "_" + streamId;
+ String tag2 = groupId2 + "_" + streamId;
+ GlobalMetrics.incReadNum(tag1);
+ assertEquals(GlobalMetrics.getReadNum(tag1), 1);
+ assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 0);
+ GlobalMetrics.incSendSuccessNum(tag2, 10);
+ assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 10);
+ GlobalMetrics.incSendSuccessNum(tag2);
+ assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 11);
+ }
+
+ @Test
+ public void testSinkMetric() {
+ String tag = sinkTag + "_" + groupId1 + "_" + streamId;
+ GlobalMetrics.incSinkFailCount(tag);
+ assertEquals(GlobalMetrics.getSinkFailCount(tag), 1);
+ }
+
+ @Test
+ public void testSourceMetric() {
+ String tag1 = sourceTag + "_" + groupId1 + "_" + streamId;
+ GlobalMetrics.incSourceSuccessCount(tag1);
+ GlobalMetrics.incSourceSuccessCount(tag1);
+ assertEquals(GlobalMetrics.getSourceSuccessCount(tag1), 2);
+
+ String tag2 = sourceTag + "_" + groupId2 + "_" + streamId;
+ assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 0);
+ GlobalMetrics.incSourceSuccessCount(tag2);
+ assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 1);
+
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 52f4bd09c..0ce57bccc 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -17,47 +17,35 @@
package org.apache.inlong.agent.plugin.sinks;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
-import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MockSink implements Sink {
+import java.util.concurrent.atomic.AtomicLong;
- private static final Logger LOGGER = LoggerFactory.getLogger(MockSink.class);
+import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
- public static final String MOCK_SINK_TAG_NAME = "AgentMockSinkMetric";
+public class MockSink implements Sink {
+ public static final String MOCK_SINK_TAG_NAME = "AgentMockSinkMetric";
+ private static final Logger LOGGER = LoggerFactory.getLogger(MockSink.class);
private final AtomicLong number = new AtomicLong(0);
+ public String tagName = MOCK_SINK_TAG_NAME + "_" + "groupIdTest" + "_" + "streamId";
private TaskPositionManager taskPositionManager;
private String sourceFileName;
private String jobInstanceId;
private long dataTime;
- private final SinkMetrics sinkMetrics;
- private static AtomicLong metricsIndex = new AtomicLong(0);
-
public MockSink() {
- if (ConfigUtil.isPrometheusEnabled()) {
- this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
- MOCK_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
- } else {
- this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
- MOCK_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
- }
+
}
@Override
@@ -66,10 +54,10 @@ public class MockSink implements Sink {
number.incrementAndGet();
taskPositionManager.updateSinkPosition(jobInstanceId, sourceFileName, 1);
// increment the count of successful sinks
- sinkMetrics.incSinkSuccessCount();
+ GlobalMetrics.incSinkSuccessCount(tagName);
} else {
// increment the count of failed sinks
- sinkMetrics.incSinkFailCount();
+ GlobalMetrics.incSinkFailCount(tagName);
}
}
@@ -88,7 +76,7 @@ public class MockSink implements Sink {
taskPositionManager = TaskPositionManager.getTaskPositionManager();
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
- jobConf.get(JOB_CYCLE_UNIT, ""));
+ jobConf.get(JOB_CYCLE_UNIT, ""));
LOGGER.info("get dataTime is : {}", dataTime);
}