You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2022/10/30 19:31:16 UTC

[flume] branch trunk updated: Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)

This is an automated email from the ASF dual-hosted git repository.

tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1821a17e6 Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)
1821a17e6 is described below

commit 1821a17e6fe8bf02d479daa10c47e357d1cc1b98
Author: Tristan Stevens <tm...@users.noreply.github.com>
AuthorDate: Sun Oct 30 19:31:08 2022 +0000

    Add support for making metrics available to Prometheus, including exposing Kafka Producer and Consumer metrics. (#358)
    
    Co-authored-by: Tristan Stevens <tr...@cloudera.com>
---
 flume-ng-core/pom.xml                              |  10 +
 .../flume/instrumentation/MonitoringType.java      |   3 +-
 .../instrumentation/http/HTTPMetricsServer.java    |   6 +-
 .../http/PrometheusHTTPMetricsServer.java          | 295 +++++++++++++++++++++
 .../instrumentation/http/BaseHTTPMetricsTest.java  |  83 ++++++
 .../http/TestHTTPMetricsServer.java                |  64 +----
 .../http/TestPrometheusMetricsServer.java          |  81 ++++++
 pom.xml                                            |  13 +
 8 files changed, 498 insertions(+), 57 deletions(-)

diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 7ae90a94e..7b9455ae7 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -408,6 +408,16 @@ limitations under the License.
       <artifactId>mina-core</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_servlet</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
index 4e1a28c26..f0deec9ca 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
@@ -25,7 +25,8 @@ package org.apache.flume.instrumentation;
 public enum MonitoringType {
   OTHER(null),
   GANGLIA(org.apache.flume.instrumentation.GangliaServer.class),
-  HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class);
+  HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class),
+  PROMETHEUS(org.apache.flume.instrumentation.http.PrometheusHTTPMetricsServer.class);
 
   private Class<? extends MonitorService> monitoringClass;
 
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
index ebb01fe13..efefb6e12 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -54,7 +54,7 @@ public class HTTPMetricsServer implements MonitorService {
   private Server jettyServer;
   private int port;
   private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class);
-  public static int DEFAULT_PORT = 41414;
+  private static int DEFAULT_PORT = 41414;
   public static String CONFIG_PORT = "port";
 
   @Override
@@ -91,6 +91,10 @@ public class HTTPMetricsServer implements MonitorService {
 
   }
 
+  public int getPort() {
+    return port;
+  }
+
   @Override
   public void configure(Context context) {
     port = context.getInteger(CONFIG_PORT, DEFAULT_PORT);
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
new file mode 100644
index 000000000..b8c4c15f6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import com.google.common.base.Throwables;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.apache.flume.instrumentation.MonitorService;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * A Monitor service implementation that runs a web server on a configurable
+ * port and returns the metrics for components in JSON format. <p> Optional
+ * parameters: <p> <tt>port</tt> : The port on which the server should listen
+ * to.<p> Returns metrics in the following format: <p>
+ *
+ * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
+ * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
+ * <p> }
+ */
+public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements MonitorService {
+
+  private static final String PROM_DEFAULT_PREFIX = "Flume_";
+  private Server jettyServer;
+  private static Logger LOG = LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
+  private static MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+  private FlumePrometheusCollector requests;
+
+  @Override
+  public void start() {
+
+    requests = new FlumePrometheusCollector().register();
+
+    jettyServer = new Server();
+    //We can use Contexts etc if we have many urls to handle. For one url,
+    //specifying a handler directly is the most efficient.
+    HttpConfiguration httpConfiguration = new HttpConfiguration();
+    ServerConnector connector = new ServerConnector(jettyServer,
+        new HttpConnectionFactory(httpConfiguration));
+    connector.setReuseAddress(true);
+    connector.setPort(getPort());
+    jettyServer.addConnector(connector);
+    ServletContextHandler context = new ServletContextHandler();
+    context.setContextPath("/");
+    jettyServer.setHandler(context);
+    context.addServlet(new ServletHolder(new MetricsServlet()),"/metrics");
+    try {
+      jettyServer.start();
+      while (!jettyServer.isStarted()) {
+        Thread.sleep(500);
+      }
+    } catch (Exception ex) {
+      LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex);
+    }
+
+  }
+
+  class FlumePrometheusCollector extends Collector {
+
+    public List<MetricFamilySamples> collect() {
+
+      Map<Object, Map<String, MetricFamilySamples>> counterMetricMap = new HashMap<>();
+      List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+
+      Set<ObjectInstance> queryMBeans;
+      try {
+        queryMBeans = mbeanServer.queryMBeans(null, null);
+
+        for (ObjectInstance obj : queryMBeans) {
+          try {
+            if (obj.getObjectName().toString().startsWith("org.apache.flume")) {
+              processFlumeMetric(counterMetricMap, mfs, obj);
+            } else if ((obj.getObjectName().toString().startsWith("kafka.consumer") ||
+                        obj.getObjectName().toString().startsWith("kafka.producer"))
+                      && obj.getObjectName().toString().contains("metrics")) {
+              processKafkaMetric(counterMetricMap, mfs, obj);
+            }
+
+
+
+          } catch (Exception e) {
+            LOG.error("Unable to poll JMX for metrics.", e);
+          }
+
+        }
+        return mfs;
+
+      } catch (Exception ex) {
+        LOG.error("Could not get Mbeans for monitoring", ex);
+        Throwables.propagate(ex);
+        return null;
+      }
+    }
+
+    private void processFlumeMetric(Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
+                                    List<MetricFamilySamples> mfs, ObjectInstance obj)
+        throws ClassNotFoundException, InstanceNotFoundException,
+        IntrospectionException, ReflectionException {
+      Class mbeanClass = Class.forName(obj.getClassName());
+      Map<String, MetricFamilySamples> metricsMap;
+
+      if (!counterMetricMap.containsKey(mbeanClass)) {
+        metricsMap = new HashMap<>();
+
+        for (Method method : mbeanClass.getMethods()) {
+          String methodName = method.getName();
+          if (methodName.startsWith("increment") && methodName.length() > "increment".length()) {
+            String counterName = PROM_DEFAULT_PREFIX + methodName.substring("increment".length());
+            createCounterIfNotExists(mfs, metricsMap, counterName);
+          } else if (methodName.startsWith("addTo")) {
+            String counterName = PROM_DEFAULT_PREFIX + methodName.substring("addTo".length());
+            createCounterIfNotExists(mfs, metricsMap, counterName);
+          } else if (methodName.startsWith("set")) {
+            String counterName = PROM_DEFAULT_PREFIX + methodName.substring("set".length());
+            createGaugeIfNotExists(mfs, metricsMap, counterName, Arrays.asList("component"));
+          }
+        }
+
+        counterMetricMap.put(mbeanClass, metricsMap);
+
+      } else {
+        metricsMap = counterMetricMap.get(mbeanClass);
+      }
+
+      MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
+      String[] strAtts = new String[attrs.length];
+      for (int i = 0; i < strAtts.length; i++) {
+        strAtts[i] = attrs[i].getName();
+      }
+      AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts);
+      String component = obj.getObjectName().toString().substring(
+              obj.getObjectName().toString().indexOf('=') + 1);
+
+      for (Object attr : attrList) {
+        Attribute localAttr = (Attribute) attr;
+        if (!localAttr.getName().equalsIgnoreCase("type")) {
+          MetricFamilySamples samples = metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
+          if (samples instanceof CounterMetricFamily) {
+            ((CounterMetricFamily) samples).addMetric(Arrays.asList(component),
+                    Double.valueOf(localAttr.getValue().toString()));
+          } else if (samples instanceof GaugeMetricFamily) {
+            ((GaugeMetricFamily) samples).addMetric(Arrays.asList(component),
+                    Double.valueOf(localAttr.getValue().toString()));
+          }
+        }
+      }
+    }
+
+    private void processKafkaMetric(Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
+                                List<MetricFamilySamples> mfs, ObjectInstance obj)
+        throws InstanceNotFoundException, IntrospectionException, ReflectionException {
+
+      ObjectName objectName = obj.getObjectName();
+      String qualifiedType =
+          makeStringPromSafe(objectName.getDomain() + "_" +
+              objectName.getKeyProperty("type"));
+
+
+      TreeMap<String, String> properties = new TreeMap<>();
+      for (String key : objectName.getKeyPropertyList().keySet()) {
+        properties.put(makeStringPromSafe(key),
+            objectName.getKeyPropertyList().get(key));
+      }
+
+      // We create a unique name for the metric based on the metric that came from Kafka, plus
+      // all of the properties. Unfortunately Kafka does not have unique metric names and therefore
+      // you can end up with metrics with differing property lists (which you can't have.
+      String metricKey = qualifiedType + "_" + String.join("_",properties.keySet()) + "_";
+
+      Map<String, MetricFamilySamples> metricsMap;
+
+      // Get the attribute list now as we'll need it to create the gauge
+      MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
+      String[] strAtts = new String[attrs.length];
+      for (int i = 0; i < strAtts.length; i++) {
+        strAtts[i] = attrs[i].getName();
+      }
+
+      // We pre-create each metric (once) before populating it once for each matching mbean
+      if (!counterMetricMap.containsKey(metricKey)) {
+        metricsMap = new HashMap<>();
+
+        for (String attr : strAtts) {
+          createGaugeIfNotExists(mfs, metricsMap, metricKey + "_" + makeStringPromSafe(attr),
+              new ArrayList<>(properties.keySet()));
+        }
+
+        counterMetricMap.put(metricKey, metricsMap);
+
+      } else {
+        metricsMap = counterMetricMap.get(metricKey);
+      }
+
+      AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts);
+
+      for (Object attr : attrList) {
+        Attribute localAttr = (Attribute) attr;
+
+        try {
+
+          GaugeMetricFamily samples =
+              (GaugeMetricFamily) metricsMap.get(metricKey + "_" + makeStringPromSafe(localAttr.getName()));
+          samples.addMetric(new ArrayList<>(properties.values()),
+              Double.valueOf(localAttr.getValue().toString()));
+        } catch (Exception e) {
+          LOG.warn("Metric {} could not be monitored", metricKey, e);
+        }
+      }
+    }
+
+    //Prometeus is really unhappy with metrics with , or - in, so replace them
+    private String makeStringPromSafe(String input) {
+      return input.replaceAll("[.\\-]", "");
+    }
+
+    private void createCounterIfNotExists(List<MetricFamilySamples> mfs, Map<String,
+            MetricFamilySamples> metricsMap, String counterName) {
+      if (!metricsMap.containsKey(counterName)) {
+        CounterMetricFamily labeledCounter = new CounterMetricFamily(counterName, counterName,
+                Arrays.asList(
+                        "component"));
+        metricsMap.put(counterName, labeledCounter);
+        mfs.add(labeledCounter);
+      }
+    }
+
+    private void createGaugeIfNotExists(List<MetricFamilySamples> mfs, Map<String,
+            MetricFamilySamples> metricsMap, String gaugeName, List<String> labelNames) {
+      if (!metricsMap.containsKey(gaugeName)) {
+        GaugeMetricFamily labelledGauge = new GaugeMetricFamily(gaugeName, gaugeName, labelNames);
+        metricsMap.put(gaugeName, labelledGauge);
+        mfs.add(labelledGauge);
+      }
+    }
+
+  }
+
+  @Override
+  public void stop() {
+    try {
+      jettyServer.stop();
+      jettyServer.join();
+    } catch (Exception ex) {
+      LOG.error("Error stopping Jetty. Prometheus Metrics may not be available.", ex);
+    }
+
+  }
+
+}
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
new file mode 100644
index 000000000..76375b469
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/BaseHTTPMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+
+import java.net.ServerSocket;
+
+public class BaseHTTPMetricsTest {
+  private Channel memChannel = new MemoryChannel();
+  private Channel pmemChannel = new PseudoTxnMemoryChannel();
+
+  static int getFreePort() throws Exception {
+    try (ServerSocket socket = new ServerSocket(0)) {
+      return socket.getLocalPort();
+    }
+  }
+
+  void runLoad() {
+    memChannel.setName("memChannel");
+    pmemChannel.setName("pmemChannel");
+    Context c = new Context();
+    Configurables.configure(memChannel, c);
+    Configurables.configure(pmemChannel, c);
+    memChannel.start();
+    pmemChannel.start();
+    Transaction txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    memChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn.commit();
+    txn.close();
+
+    txn = memChannel.getTransaction();
+    txn.begin();
+    memChannel.take();
+    txn.commit();
+    txn.close();
+
+
+    Transaction txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
+    txn2.commit();
+    txn2.close();
+
+    txn2 = pmemChannel.getTransaction();
+    txn2.begin();
+    pmemChannel.take();
+    txn2.commit();
+    txn2.close();
+  }
+
+  void shutdown() {
+    memChannel.stop();
+    pmemChannel.stop();
+  }
+
+}
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
index e39b0bd18..aae7bf24c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -21,13 +21,7 @@ package org.apache.flume.instrumentation.http;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.PseudoTxnMemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXTestUtils;
 import org.junit.Assert;
@@ -38,62 +32,22 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.lang.reflect.Type;
 import java.net.HttpURLConnection;
-import java.net.ServerSocket;
 import java.net.URL;
 import java.util.Map;
 
-public class TestHTTPMetricsServer {
+public class TestHTTPMetricsServer extends BaseHTTPMetricsTest {
 
-  Channel memChannel = new MemoryChannel();
-  Channel pmemChannel = new PseudoTxnMemoryChannel();
-  Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType();
-  Gson gson = new Gson();
-
-  private static int getFreePort() throws Exception {
-    try (ServerSocket socket = new ServerSocket(0)) {
-      return socket.getLocalPort();
-    }
-  }
+  private Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType();
+  private Gson gson = new Gson();
 
   @Test
   public void testJSON() throws Exception {
-    memChannel.setName("memChannel");
-    pmemChannel.setName("pmemChannel");
-    Context c = new Context();
-    Configurables.configure(memChannel, c);
-    Configurables.configure(pmemChannel, c);
-    memChannel.start();
-    pmemChannel.start();
-    Transaction txn = memChannel.getTransaction();
-    txn.begin();
-    memChannel.put(EventBuilder.withBody("blah".getBytes()));
-    memChannel.put(EventBuilder.withBody("blah".getBytes()));
-    txn.commit();
-    txn.close();
-
-    txn = memChannel.getTransaction();
-    txn.begin();
-    memChannel.take();
-    txn.commit();
-    txn.close();
 
-
-    Transaction txn2 = pmemChannel.getTransaction();
-    txn2.begin();
-    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
-    pmemChannel.put(EventBuilder.withBody("blah".getBytes()));
-    txn2.commit();
-    txn2.close();
-
-    txn2 = pmemChannel.getTransaction();
-    txn2.begin();
-    pmemChannel.take();
-    txn2.commit();
-    txn2.close();
+    runLoad();
 
     testWithPort(getFreePort());
-    memChannel.stop();
-    pmemChannel.stop();
+
+    shutdown();
   }
 
   private void testWithPort(int port) throws Exception {
@@ -103,7 +57,7 @@ public class TestHTTPMetricsServer {
     srv.configure(context);
     srv.start();
     Thread.sleep(1000);
-    URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics");
+    URL url = new URL("http://0.0.0.0:" + port + "/metrics");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setRequestMethod("GET");
     BufferedReader reader = new BufferedReader(
@@ -134,14 +88,14 @@ public class TestHTTPMetricsServer {
     doTestForbiddenMethods(getFreePort(),"OPTIONS");
   }
 
-  public void doTestForbiddenMethods(int port, String method) throws Exception {
+  private void doTestForbiddenMethods(int port, String method) throws Exception {
     MonitorService srv = new HTTPMetricsServer();
     Context context = new Context();
     context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
     srv.configure(context);
     srv.start();
     Thread.sleep(1000);
-    URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics");
+    URL url = new URL("http://0.0.0.0:" + port + "/metrics");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setRequestMethod(method);
     Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, conn.getResponseCode());
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
new file mode 100644
index 000000000..61b17cd7e
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestPrometheusMetricsServer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation.http;
+
+import org.apache.flume.Context;
+import org.apache.flume.instrumentation.MonitorService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class TestPrometheusMetricsServer extends BaseHTTPMetricsTest {
+
+  @Test
+  public void testMetics() throws Exception {
+    runLoad();
+
+    testWithPort(getFreePort());
+
+    shutdown();
+  }
+
+  private void testWithPort(int port) throws Exception {
+    MonitorService srv = new PrometheusHTTPMetricsServer();
+    Context context = new Context();
+    context.put(PrometheusHTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
+    srv.configure(context);
+    srv.start();
+    Thread.sleep(1000);
+    URL url = new URL("http://0.0.0.0:" + port + "/metrics");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("GET");
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(conn.getInputStream()));
+    String line;
+    String result = "";
+    while ((line = reader.readLine()) != null) {
+      result += line + "\n";
+    }
+    reader.close();
+    String[] targetOutputs = {"ChannelSize{component=\"pmemChannel\",} 1.0\n",
+      "Flume_ChannelSize{component=\"memChannel\",} 1.0\n",
+      "Flume_ChannelCapacity{component=\"pmemChannel\",} 0.0\n",
+      "Flume_ChannelCapacity{component=\"memChannel\",} 100.0\n",
+      "Flume_EventPutAttemptCount_total{component=\"pmemChannel\",} 2.0\n",
+      "Flume_EventPutAttemptCount_total{component=\"memChannel\",} 2.0\n",
+      "Flume_EventTakeAttemptCount_total{component=\"pmemChannel\",} 1.0\n",
+      "Flume_EventTakeAttemptCount_total{component=\"memChannel\",} 1.0\n",
+      "Flume_EventPutSuccessCount_total{component=\"pmemChannel\",} 2.0\n",
+      "Flume_EventPutSuccessCount_total{component=\"memChannel\",} 2.0\n",
+      "Flume_EventTakeSuccessCount_total{component=\"pmemChannel\",} 1.0\n",
+      "Flume_EventTakeSuccessCount_total{component=\"memChannel\",} 1.0\n"};
+
+    for (String target : targetOutputs) {
+      Assert.assertTrue(result.contains(target));
+    }
+
+    srv.stop();
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 8be106499..d500ac8fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@ limitations under the License.
     <netty-all.version>4.1.72.Final</netty-all.version>
     <parquet.version>1.11.2</parquet.version>
     <protobuf.version>2.5.0</protobuf.version>
+    <prometheus.version>0.15.0</prometheus.version>
     <rat.version>0.12</rat.version>
     <scala.version>2.13</scala.version>
     <scala-library.version>2.13.9</scala-library.version>
@@ -1332,6 +1333,18 @@ limitations under the License.
         <version>${netty-all.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>io.prometheus</groupId>
+        <artifactId>simpleclient</artifactId>
+        <version>${prometheus.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.prometheus</groupId>
+        <artifactId>simpleclient_servlet</artifactId>
+        <version>${prometheus.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.solr</groupId>
         <artifactId>solr-cell</artifactId>