You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2022/10/30 19:31:16 UTC
[flume] branch trunk updated: Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)
This is an automated email from the ASF dual-hosted git repository.
tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1821a17e6 Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)
1821a17e6 is described below
commit 1821a17e6fe8bf02d479daa10c47e357d1cc1b98
Author: Tristan Stevens <tm...@users.noreply.github.com>
AuthorDate: Sun Oct 30 19:31:08 2022 +0000
Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)
Co-authored-by: Tristan Stevens <tr...@cloudera.com>
---
flume-ng-core/pom.xml | 10 +
.../flume/instrumentation/MonitoringType.java | 3 +-
.../instrumentation/http/HTTPMetricsServer.java | 6 +-
.../http/PrometheusHTTPMetricsServer.java | 295 +++++++++++++++++++++
.../instrumentation/http/BaseHTTPMetricsTest.java | 83 ++++++
.../http/TestHTTPMetricsServer.java | 64 +----
.../http/TestPrometheusMetricsServer.java | 81 ++++++
pom.xml | 13 +
8 files changed, 498 insertions(+), 57 deletions(-)
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 7ae90a94e..7b9455ae7 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -408,6 +408,16 @@ limitations under the License.
<artifactId>mina-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_servlet</artifactId>
+ </dependency>
+
</dependencies>
</project>
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
index 4e1a28c26..f0deec9ca 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
@@ -25,7 +25,8 @@ package org.apache.flume.instrumentation;
public enum MonitoringType {
OTHER(null),
GANGLIA(org.apache.flume.instrumentation.GangliaServer.class),
- HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class);
+ HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class),
+ PROMETHEUS(org.apache.flume.instrumentation.http.PrometheusHTTPMetricsServer.class);
private Class<? extends MonitorService> monitoringClass;
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
index ebb01fe13..efefb6e12 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -54,7 +54,7 @@ public class HTTPMetricsServer implements MonitorService {
private Server jettyServer;
private int port;
private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class);
- public static int DEFAULT_PORT = 41414;
+ private static int DEFAULT_PORT = 41414;
public static String CONFIG_PORT = "port";
@Override
@@ -91,6 +91,10 @@ public class HTTPMetricsServer implements MonitorService {
}
+ public int getPort() {
+ return port;
+ }
+
@Override
public void configure(Context context) {
port = context.getInteger(CONFIG_PORT, DEFAULT_PORT);
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
new file mode 100644
index 000000000..b8c4c15f6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import com.google.common.base.Throwables;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.apache.flume.instrumentation.MonitorService;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * A Monitor service implementation that runs a web server on a configurable
+ * port and returns the metrics for components in JSON format. <p> Optional
+ * parameters: <p> <tt>port</tt> : The port on which the server should listen
+ * to.<p> Returns metrics in the following format: <p>
+ *
+ * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
+ * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
+ * <p> }
+ */
+public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements MonitorService {
+
+ private static final String PROM_DEFAULT_PREFIX = "Flume_";
+ private Server jettyServer;
+ private static Logger LOG = LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
+ private static MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ private FlumePrometheusCollector requests;
+
+ @Override
+ public void start() {
+
+ requests = new FlumePrometheusCollector().register();
+
+ jettyServer = new Server();
+ //We can use Contexts etc if we have many urls to handle. For one url,
+ //specifying a handler directly is the most efficient.
+ HttpConfiguration httpConfiguration = new HttpConfiguration();
+ ServerConnector connector = new ServerConnector(jettyServer,
+ new HttpConnectionFactory(httpConfiguration));
+ connector.setReuseAddress(true);
+ connector.setPort(getPort());
+ jettyServer.addConnector(connector);
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ jettyServer.setHandler(context);
+ context.addServlet(new ServletHolder(new MetricsServlet()),"/metrics");
+ try {
+ jettyServer.start();
+ while (!jettyServer.isStarted()) {
+ Thread.sleep(500);
+ }
+ } catch (Exception ex) {
+ LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex);
+ }
+
+ }
+
+ class FlumePrometheusCollector extends Collector {
+
+ public List<MetricFamilySamples> collect() {
+
+ Map<Object, Map<String, MetricFamilySamples>> counterMetricMap = new HashMap<>();
+ List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+
+ Set<ObjectInstance> queryMBeans;
+ try {
+ queryMBeans = mbeanServer.queryMBeans(null, null);
+
+ for (ObjectInstance obj : queryMBeans) {
+ try {
+ if (obj.getObjectName().toString().startsWith("org.apache.flume")) {
+ processFlumeMetric(counterMetricMap, mfs, obj);
+ } else if ((obj.getObjectName().toString().startsWith("kafka.consumer") ||
+ obj.getObjectName().toString().startsWith("kafka.producer"))
+ && obj.getObjectName().toString().contains("metrics")) {
+ processKafkaMetric(counterMetricMap, mfs, obj);
+ }
+
+
+
+ } catch (Exception e) {
+ LOG.error("Unable to poll JMX for metrics.", e);
+ }
+
+ }
+ return mfs;
+
+ } catch (Exception ex) {
+ LOG.error("Could not get Mbeans for monitoring", ex);
+ Throwables.propagate(ex);
+ return null;
+ }
+ }
+
+ private void processFlumeMetric(Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
+ List<MetricFamilySamples> mfs, ObjectInstance obj)
+ throws ClassNotFoundException, InstanceNotFoundException,
+ IntrospectionException, ReflectionException {
+ Class mbeanClass = Class.forName(obj.getClassName());
+ Map<String, MetricFamilySamples> metricsMap;
+
+ if (!counterMetricMap.containsKey(mbeanClass)) {
+ metricsMap = new HashMap<>();
+
+ for (Method method : mbeanClass.getMethods()) {
+ String methodName = method.getName();
+ if (methodName.startsWith("increment") && methodName.length() > "increment".length()) {
+ String counterName = PROM_DEFAULT_PREFIX + methodName.substring("increment".length());
+ createCounterIfNotExists(mfs, metricsMap, counterName);
+ } else if (methodName.startsWith("addTo")) {
+ String counterName = PROM_DEFAULT_PREFIX + methodName.substring("addTo".length());
+ createCounterIfNotExists(mfs, metricsMap, counterName);
+ } else if (methodName.startsWith("set")) {
+ String counterName = PROM_DEFAULT_PREFIX + methodName.substring("set".length());
+ createGaugeIfNotExists(mfs, metricsMap, counterName, Arrays.asList("component"));
+ }
+ }
+
+ counterMetricMap.put(mbeanClass, metricsMap);
+
+ } else {
+ metricsMap = counterMetricMap.get(mbeanClass);
+ }
+
+ MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
+ String[] strAtts = new String[attrs.length];
+ for (int i = 0; i < strAtts.length; i++) {
+ strAtts[i] = attrs[i].getName();
+ }
+ AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts);
+ String component = obj.getObjectName().toString().substring(
+ obj.getObjectName().toString().indexOf('=') + 1);
+
+ for (Object attr : attrList) {
+ Attribute localAttr = (Attribute) attr;
+ if (!localAttr.getName().equalsIgnoreCase("type")) {
+ MetricFamilySamples samples = metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
+ if (samples instanceof CounterMetricFamily) {
+ ((CounterMetricFamily) samples).addMetric(Arrays.asList(component),
+ Double.valueOf(localAttr.getValue().toString()));
+ } else if (samples instanceof GaugeMetricFamily) {
+ ((GaugeMetricFamily) samples).addMetric(Arrays.asList(component),
+ Double.valueOf(localAttr.getValue().toString()));
+ }
+ }
+ }
+ }
+
+ private void processKafkaMetric(Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
+ List<MetricFamilySamples> mfs, ObjectInstance obj)
+ throws InstanceNotFoundException, IntrospectionException, ReflectionException {
+
+ ObjectName objectName = obj.getObjectName();
+ String qualifiedType =
+ makeStringPromSafe(objectName.getDomain() + "_" +
+ objectName.getKeyProperty("type"));
+
+
+ TreeMap<String, String> properties = new TreeMap<>();
+ for (String key : objectName.getKeyPropertyList().keySet()) {
+ properties.put(makeStringPromSafe(key),
+ objectName.getKeyPropertyList().get(key));
+ }
+
+ // We create a unique name for the metric based on the metric that came from Kafka, plus
+ // all of the properties. Unfortunately Kafka does not have unique metric names and therefore
+ // you can end up with metrics with differing property lists (which you can't have.
+ String metricKey = qualifiedType + "_" + String.join("_",properties.keySet()) + "_";
+
+ Map<String, MetricFamilySamples> metricsMap;
+
+ // Get the attribute list now as we'll need it to create the gauge
+ MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
+ String[] strAtts = new String[attrs.length];
+ for (int i = 0; i < strAtts.length; i++) {
+ strAtts[i] = attrs[i].getName();
+ }
+
+ // We pre-create each metric (once) before populating it once for each matching mbean
+ if (!counterMetricMap.containsKey(metricKey)) {
+ metricsMap = new HashMap<>();
+
+ for (String attr : strAtts) {
+ createGaugeIfNotExists(mfs, metricsMap, metricKey + "_" + makeStringPromSafe(attr),
+ new ArrayList<>(properties.keySet()));
+ }
+
+ counterMetricMap.put(metricKey, metricsMap);
+
+ } else {
+ metricsMap = counterMetricMap.get(metricKey);
+ }
+
+ AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts);
+
+ for (Object attr : attrList) {
+ Attribute localAttr = (Attribute) attr;
+
+ try {
+
+ GaugeMetricFamily samples =
+ (GaugeMetricFamily) metricsMap.get(metricKey + "_" + makeStringPromSafe(localAttr.getName()));
+ samples.addMetric(new ArrayList<>(properties.values()),
+ Double.valueOf(localAttr.getValue().toString()));
+ } catch (Exception e) {
+ LOG.warn("Metric {} could not be monitored", metricKey, e);
+ }
+ }
+ }
+
+ //Prometeus is really unhappy with metrics with , or - in, so replace them
+ private String makeStringPromSafe(String input) {
+ return input.replaceAll("[.\\-]", "");
+ }
+
+ private void createCounterIfNotExists(List<MetricFamilySamples> mfs, Map<String,
+ MetricFamilySamples> metricsMap, String counterName) {
+ if (!metricsMap.containsKey(counterName)) {
+ CounterMetricFamily labeledCounter = new CounterMetricFamily(counterName, counterName,
+ Arrays.asList(
+ "component"));
+ metricsMap.put(counterName, labeledCounter);
+ mfs.add(labeledCounter);
+ }
+ }
+
+ private void createGaugeIfNotExists(List<MetricFamilySamples> mfs, Map<String,
+ MetricFamilySamples> metricsMap, String gaugeName, List<String> labelNames) {
+ if (!metricsMap.containsKey(gaugeName)) {
+ GaugeMetricFamily labelledGauge = new GaugeMetricFamily(gaugeName, gaugeName, labelNames);
+ metricsMap.put(gaugeName, labelledGauge);
+ mfs.add(labelledGauge);
+ }
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ try {
+ jettyServer.stop();
+ jettyServer.join();
+ } catch (Exception ex) {
+ LOG.error("Error stopping Jetty. Prometheus Metrics may not be available.", ex);
+ }
+
+ }
+
+}
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
new file mode 100644
index 000000000..76375b469
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+
+import java.net.ServerSocket;
+
+public class BaseHTTPMetricsTest {
+ private Channel memChannel = new MemoryChannel();
+ private Channel pmemChannel = new PseudoTxnMemoryChannel();
+
+ static int getFreePort() throws Exception {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
+
+ void runLoad() {
+ memChannel.setName("memChannel");
+ pmemChannel.setName("pmemChannel");
+ Context c = new Context();
+ Configurables.configure(memChannel, c);
+ Configurables.configure(pmemChannel, c);
+ memChannel.start();
+ pmemChannel.start();
+ Transaction txn = memChannel.getTransaction();
+ txn.begin();
+ memChannel.put(EventBuilder.withBody("blah".getBytes()));
+ memChannel.put(EventBuilder.withBody("blah".getBytes()));
+ txn.commit();
+ txn.close();
+
+ txn = memChannel.getTransaction();
+ txn.begin();
+ memChannel.take();
+ txn.commit();
+ txn.close();
+
+
+ Transaction txn2 = pmemChannel.getTransaction();
+ txn2.begin();
+ pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+ pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+ txn2.commit();
+ txn2.close();
+
+ txn2 = pmemChannel.getTransaction();
+ txn2.begin();
+ pmemChannel.take();
+ txn2.commit();
+ txn2.close();
+ }
+
+ void shutdown() {
+ memChannel.stop();
+ pmemChannel.stop();
+ }
+
+}
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
index e39b0bd18..aae7bf24c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -21,13 +21,7 @@ package org.apache.flume.instrumentation.http;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.PseudoTxnMemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXTestUtils;
import org.junit.Assert;
@@ -38,62 +32,22 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
-import java.net.ServerSocket;
import java.net.URL;
import java.util.Map;
-public class TestHTTPMetricsServer {
+public class TestHTTPMetricsServer extends BaseHTTPMetricsTest {
- Channel memChannel = new MemoryChannel();
- Channel pmemChannel = new PseudoTxnMemoryChannel();
- Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType();
- Gson gson = new Gson();
-
- private static int getFreePort() throws Exception {
- try (ServerSocket socket = new ServerSocket(0)) {
- return socket.getLocalPort();
- }
- }
+ private Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType();
+ private Gson gson = new Gson();
@Test
public void testJSON() throws Exception {
- memChannel.setName("memChannel");
- pmemChannel.setName("pmemChannel");
- Context c = new Context();
- Configurables.configure(memChannel, c);
- Configurables.configure(pmemChannel, c);
- memChannel.start();
- pmemChannel.start();
- Transaction txn = memChannel.getTransaction();
- txn.begin();
- memChannel.put(EventBuilder.withBody("blah".getBytes()));
- memChannel.put(EventBuilder.withBody("blah".getBytes()));
- txn.commit();
- txn.close();
-
- txn = memChannel.getTransaction();
- txn.begin();
- memChannel.take();
- txn.commit();
- txn.close();
-
- Transaction txn2 = pmemChannel.getTransaction();
- txn2.begin();
- pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
- pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
- txn2.commit();
- txn2.close();
-
- txn2 = pmemChannel.getTransaction();
- txn2.begin();
- pmemChannel.take();
- txn2.commit();
- txn2.close();
+ runLoad();
testWithPort(getFreePort());
- memChannel.stop();
- pmemChannel.stop();
+
+ shutdown();
}
private void testWithPort(int port) throws Exception {
@@ -103,7 +57,7 @@ public class TestHTTPMetricsServer {
srv.configure(context);
srv.start();
Thread.sleep(1000);
- URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics");
+ URL url = new URL("http://0.0.0.0:" + port + "/metrics");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
BufferedReader reader = new BufferedReader(
@@ -134,14 +88,14 @@ public class TestHTTPMetricsServer {
doTestForbiddenMethods(getFreePort(),"OPTIONS");
}
- public void doTestForbiddenMethods(int port, String method) throws Exception {
+ private void doTestForbiddenMethods(int port, String method) throws Exception {
MonitorService srv = new HTTPMetricsServer();
Context context = new Context();
context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
srv.configure(context);
srv.start();
Thread.sleep(1000);
- URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics");
+ URL url = new URL("http://0.0.0.0:" + port + "/metrics");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(method);
Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, conn.getResponseCode());
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
new file mode 100644
index 000000000..61b17cd7e
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import org.apache.flume.Context;
+import org.apache.flume.instrumentation.MonitorService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class TestPrometheusMetricsServer extends BaseHTTPMetricsTest {
+
+ @Test
+ public void testMetics() throws Exception {
+ runLoad();
+
+ testWithPort(getFreePort());
+
+ shutdown();
+ }
+
+ private void testWithPort(int port) throws Exception {
+ MonitorService srv = new PrometheusHTTPMetricsServer();
+ Context context = new Context();
+ context.put(PrometheusHTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
+ srv.configure(context);
+ srv.start();
+ Thread.sleep(1000);
+ URL url = new URL("http://0.0.0.0:" + port + "/metrics");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(conn.getInputStream()));
+ String line;
+ String result = "";
+ while ((line = reader.readLine()) != null) {
+ result += line + "\n";
+ }
+ reader.close();
+ String[] targetOutputs = {"ChannelSize{component=\"pmemChannel\",} 1.0\n",
+ "Flume_ChannelSize{component=\"memChannel\",} 1.0\n",
+ "Flume_ChannelCapacity{component=\"pmemChannel\",} 0.0\n",
+ "Flume_ChannelCapacity{component=\"memChannel\",} 100.0\n",
+ "Flume_EventPutAttemptCount_total{component=\"pmemChannel\",} 2.0\n",
+ "Flume_EventPutAttemptCount_total{component=\"memChannel\",} 2.0\n",
+ "Flume_EventTakeAttemptCount_total{component=\"pmemChannel\",} 1.0\n",
+ "Flume_EventTakeAttemptCount_total{component=\"memChannel\",} 1.0\n",
+ "Flume_EventPutSuccessCount_total{component=\"pmemChannel\",} 2.0\n",
+ "Flume_EventPutSuccessCount_total{component=\"memChannel\",} 2.0\n",
+ "Flume_EventTakeSuccessCount_total{component=\"pmemChannel\",} 1.0\n",
+ "Flume_EventTakeSuccessCount_total{component=\"memChannel\",} 1.0\n"};
+
+ for (String target : targetOutputs) {
+ Assert.assertTrue(result.contains(target));
+ }
+
+ srv.stop();
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 8be106499..d500ac8fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@ limitations under the License.
<netty-all.version>4.1.72.Final</netty-all.version>
<parquet.version>1.11.2</parquet.version>
<protobuf.version>2.5.0</protobuf.version>
+ <prometheus.version>0.15.0</prometheus.version>
<rat.version>0.12</rat.version>
<scala.version>2.13</scala.version>
<scala-library.version>2.13.9</scala-library.version>
@@ -1332,6 +1333,18 @@ limitations under the License.
<version>${netty-all.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_servlet</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-cell</artifactId>