You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ym...@apache.org on 2020/04/03 20:13:05 UTC

[nifi] branch master updated: NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)

This is an automated email from the ASF dual-hosted git repository.

ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a093af2  NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)
a093af2 is described below

commit a093af2d42c34200011847f8bf40f92ccb89614d
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Apr 3 16:12:53 2020 -0400

    NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)
    
    * NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping
    
    * NIFI-7273: Changed method name, fix handling when analytics not enabled
    
    * NIFI-7273: Removed attachment header from Prometheus metrics endpoint
    
    * NIFI-7273: Removed unused variable
---
 .../nifi-prometheus-utils/pom.xml                  | 94 +++++++++++++++++++++
 .../prometheus/util}/PrometheusMetricsUtil.java    | 97 +++++++++++++++++++---
 nifi-nar-bundles/nifi-extension-utils/pom.xml      |  1 +
 .../nifi-framework/nifi-web/nifi-web-api/pom.xml   |  5 ++
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  5 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java | 52 ++++++++++++
 .../java/org/apache/nifi/web/api/FlowResource.java | 65 +++++++++++++++
 .../nifi-prometheus-reporting-task/pom.xml         | 36 +-------
 .../reporting/prometheus/PrometheusRecordSink.java | 16 +++-
 .../prometheus/PrometheusReportingTask.java        | 22 +++--
 .../prometheus/TestPrometheusRecordSink.java       |  4 +-
 .../prometheus/TestPrometheusReportingTask.java    |  2 +-
 12 files changed, 343 insertions(+), 56 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
new file mode 100644
index 0000000..b2f8b91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.12.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-prometheus-utils</artifactId>
+    <packaging>jar</packaging>
+    <description>
+        This nifi-prometheus-utils module is designed to capture common patterns
+        and utilities that can be leveraged by components that use Prometheus capabilities to
+        help promote reuse.  These patterns may become framework level features
+        or may simply be made available through this utility.  It is ok for this
+        module to have dependencies but care should be taken when adding dependencies
+        as this increases the cost of utilizing this module in various nars.
+    </description>
+    <properties>
+        <prometheus.version>0.3.0</prometheus.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-metrics</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
+        </dependency>
+        <!-- The client -->
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient</artifactId>
+            <version>${prometheus.version}</version>
+        </dependency>
+        <!-- Hotspot JVM metrics -->
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_hotspot</artifactId>
+            <version>${prometheus.version}</version>
+        </dependency>
+        <!-- Exposition servlet -->
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_servlet</artifactId>
+            <version>${prometheus.version}</version>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
+                 allow NiFi to be compiled on those JDKs. -->
+            <id>jigsaw</id>
+            <activation>
+                <jdk>(1.8,)</jdk>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>com.sun.xml.bind</groupId>
+                    <artifactId>jaxb-core</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>com.sun.xml.bind</groupId>
+                    <artifactId>jaxb-impl</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+</project>
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
similarity index 83%
rename from nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index 33222c7..0ac5f47 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -15,13 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.reporting.prometheus.api;
+package org.apache.nifi.prometheus.util;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import io.prometheus.client.Counter;
 import io.prometheus.client.SimpleCollector;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -34,11 +37,11 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.metrics.jvm.JvmMetrics;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.RestrictedSSLContextService;
 
 public class PrometheusMetricsUtil {
 
@@ -51,6 +54,10 @@ public class PrometheusMetricsUtil {
 
     private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
     private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
+    private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry();
+    private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
+
+    public static final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
 
     // Common properties/values
     public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
@@ -81,15 +88,6 @@ public class PrometheusMetricsUtil {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
-            .name("prometheus-reporting-task-ssl-context")
-            .displayName("SSL Context Service")
-            .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
-                    + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
-            .required(false)
-            .identifiesControllerService(RestrictedSSLContextService.class)
-            .build();
-
     public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
             .name("prometheus-reporting-task-client-auth")
             .displayName("Client Authentication")
@@ -137,6 +135,16 @@ public class PrometheusMetricsUtil {
             .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
+    private static final Counter TOTAL_BYTES_READ = Counter.build().name("nifi_total_bytes_read")
+            .help("Total number of bytes read by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Counter TOTAL_BYTES_WRITTEN = Counter.build().name("nifi_total_bytes_written")
+            .help("Total number of bytes written by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
     private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
             .name("nifi_amount_bytes_written")
             .help("Total number of bytes written by the component")
@@ -267,6 +275,42 @@ public class PrometheusMetricsUtil {
                     "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
+    // Connection status analytics metrics
+    private static final Gauge TIME_TO_BYTES_BACKPRESSURE_PREDICTION = Gauge.build()
+            .name("nifi_time_to_bytes_backpressure_prediction")
+            .help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to bytes in the queue")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(CONNECTION_ANALYTICS_REGISTRY);
+
+    private static final Gauge TIME_TO_COUNT_BACKPRESSURE_PREDICTION = Gauge.build()
+            .name("nifi_time_to_count_backpressure_prediction")
+            .help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to number of objects in the queue")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(CONNECTION_ANALYTICS_REGISTRY);
+
+    private static final Gauge BYTES_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
+            .name("nifi_bytes_at_next_interval_prediction")
+            .help("Predicted number of bytes in the queue at the next configured interval")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(CONNECTION_ANALYTICS_REGISTRY);
+
+    private static final Gauge COUNT_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
+            .name("nifi_count_at_next_interval_prediction")
+            .help("Predicted number of objects in the queue at the next configured interval")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(CONNECTION_ANALYTICS_REGISTRY);
+
+    private static final Gauge BULLETIN = Gauge.build()
+            .name("nifi_bulletin")
+            .help("Bulletin reported by the NiFi instance")
+            .labelNames("instance", "component_type", "component_id", "parent_id",
+                    "node_address", "category", "source_name", "source_id", "level")
+            .register(BULLETIN_REGISTRY);
+
     ///////////////////////////////////////////////////////////////
     // JVM Metrics
     ///////////////////////////////////////////////////////////////
@@ -350,6 +394,8 @@ public class PrometheusMetricsUtil {
         AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
         AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
         AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
+        TOTAL_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesRead());
+        TOTAL_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesWritten());
         AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
         AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
 
@@ -399,6 +445,8 @@ public class PrometheusMetricsUtil {
                 AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
                 AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
                 AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
+                TOTAL_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesRead());
+                TOTAL_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesWritten());
                 AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
 
                 SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
@@ -463,6 +511,8 @@ public class PrometheusMetricsUtil {
                 AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
                 AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
                 AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
+                TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
                 AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
 
                 AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
@@ -487,6 +537,8 @@ public class PrometheusMetricsUtil {
                 AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
                 AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
                 AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
+                TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
                 AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
 
                 AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
@@ -552,4 +604,27 @@ public class PrometheusMetricsUtil {
         return JVM_REGISTRY;
     }
 
+    public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName,
+                                                                           String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) {
+        if(statusAnalytics != null) {
+            Map<String, Long> predictions = statusAnalytics.getPredictions();
+            TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                    .set(predictions.get("timeToBytesBackpressureMillis"));
+            TIME_TO_COUNT_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                    .set(predictions.get("timeToCountBackpressureMillis"));
+            BYTES_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                    .set(predictions.get("nextIntervalBytes"));
+            COUNT_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                    .set(predictions.get("nextIntervalCount"));
+        }
+
+        return CONNECTION_ANALYTICS_REGISTRY;
+    }
+
+    public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress,
+                                                          String category, String sourceName, String sourceId, String level) {
+
+        BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1);
+        return BULLETIN_REGISTRY;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index 037f6b4..c5b3201 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -35,6 +35,7 @@
         <module>nifi-database-utils</module>
         <module>nifi-database-test-utils</module>
         <module>nifi-service-utils</module>
+        <module>nifi-prometheus-utils</module>
     </modules>
 
 </project>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 30bb589..46de87a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -257,6 +257,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-prometheus-utils</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
             <scope>provided</scope>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 5107451..fe69a5a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -315,6 +315,11 @@ public interface NiFiServiceFacade {
     FlowConfigurationEntity getFlowConfiguration();
 
     /**
+     * Gets the metrics for the flow.
+     */
+    void generateFlowMetrics();
+
+    /**
      * Updates the configuration for this controller.
      *
      * @param revision Revision to compare with current base revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index c492123..f6d48ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -94,12 +94,14 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.parameter.ParameterDescriptor;
 import org.apache.nifi.parameter.ParameterReferenceManager;
 import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.authorization.Permissions;
 import org.apache.nifi.registry.bucket.Bucket;
@@ -5295,6 +5297,56 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void generateFlowMetrics() {
+
+        String instanceId = controllerFacade.getInstanceId();
+        ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
+        PrometheusMetricsUtil.createNifiMetrics(rootPGStatus, instanceId, "", "RootProcessGroup",
+                PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
+        PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId);
+
+        // Get Connection Status Analytics (predictions, e.g.)
+        Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
+        for (Connection c : connections) {
+            // If a ResourceNotFoundException is thrown, analytics hasn't been enabled
+            try {
+                PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
+                        instanceId,
+                        "Connection",
+                        c.getName(),
+                        c.getIdentifier(),
+                        c.getProcessGroup().getIdentifier(),
+                        c.getSource().getName(),
+                        c.getSource().getIdentifier(),
+                        c.getDestination().getName(),
+                        c.getDestination().getIdentifier()
+                );
+            } catch (ResourceNotFoundException rnfe) {
+                break;
+            }
+        }
+
+        // Create a query to get all bulletins
+        final BulletinQueryDTO query = new BulletinQueryDTO();
+        BulletinBoardDTO bulletinBoardDTO = getBulletinBoard(query);
+        for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) {
+            BulletinDTO bulletin = bulletinEntity.getBulletin();
+            if(bulletin != null) {
+                PrometheusMetricsUtil.createBulletinMetrics(instanceId,
+                        "Bulletin",
+                        String.valueOf(bulletin.getId()),
+                        bulletin.getGroupId() == null ? "" : bulletin.getGroupId(),
+                        bulletin.getNodeAddress() == null ? "" : bulletin.getNodeAddress(),
+                        bulletin.getCategory(),
+                        bulletin.getSourceName(),
+                        bulletin.getSourceId(),
+                        bulletin.getLevel()
+                );
+            }
+        }
+    }
+
+    @Override
     public boolean isClustered() {
         return controllerFacade.isClustered();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 7072214..6437249 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web.api;
 
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -43,6 +45,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.IllegalClusterResourceRequestException;
@@ -129,6 +132,10 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.text.Collator;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -381,6 +388,64 @@ public class FlowResource extends ApplicationResource {
         return generateOkResponse(entity).build();
     }
 
+    /**
+     * Retrieves the metrics of the entire flow.
+     *
+     * @return A flowMetricsEntity.
+     * @throws InterruptedException if interrupted
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.WILDCARD)
+    @Path("metrics/{producer}")
+    @ApiOperation(
+            value = "Gets all metrics for the flow from a particular node",
+            response = StreamingOutput.class,
+            authorizations = {
+                    @Authorization(value = "Read - /flow")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getFlowMetrics(
+            @ApiParam(
+                    value = "The producer for flow file metrics. Each producer may have its own output format.",
+                    required = true
+            )
+            @PathParam("producer") final String producer) throws InterruptedException {
+
+        authorizeFlow();
+
+        if ("prometheus".equalsIgnoreCase(producer)) {
+            // get this process group flow
+            serviceFacade.generateFlowMetrics();
+            // generate a streaming response
+            final StreamingOutput response = output -> {
+                Writer writer = new BufferedWriter(new OutputStreamWriter(output));
+                for (CollectorRegistry collectorRegistry : PrometheusMetricsUtil.ALL_REGISTRIES) {
+                    TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
+                    // flush the response
+                    output.flush();
+                }
+                writer.flush();
+                writer.close();
+            };
+
+            return generateOkResponse(response)
+                    .type(MediaType.TEXT_PLAIN_TYPE)
+                    .build();
+        } else {
+            throw new ResourceNotFoundException("The specified producer is missing or invalid.");
+        }
+    }
+
     // -------------------
     // controller services
     // -------------------
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
index 300c601..367cc49 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
@@ -16,9 +16,6 @@
         <artifactId>nifi-prometheus-bundle</artifactId>
         <version>1.12.0-SNAPSHOT</version>
 </parent>
-<properties>
-        <prometheus.version>0.3.0</prometheus.version>
-</properties>
 
 <artifactId>nifi-prometheus-reporting-task</artifactId>
 <description>Prometheus /metrics http endpoint for monitoring</description>
@@ -26,20 +23,6 @@
 <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-metrics</artifactId>
-            <version>1.12.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-                <groupId>org.apache.nifi</groupId>
-               <artifactId>nifi-api</artifactId>
-        </dependency>
-        <dependency>
-               <groupId>org.apache.nifi</groupId>
-               <artifactId>nifi-utils</artifactId>
-               <version>1.12.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record-sink-api</artifactId>
             <version>1.12.0-SNAPSHOT</version>
         </dependency>
@@ -53,23 +36,10 @@
             <artifactId>nifi-record</artifactId>
             <version>1.12.0-SNAPSHOT</version>
         </dependency>
-        <!-- The client -->
-        <dependency>
-               <groupId>io.prometheus</groupId>
-               <artifactId>simpleclient</artifactId>
-               <version>${prometheus.version}</version>
-        </dependency>
-        <!-- Hotspot JVM metrics -->
         <dependency>
-               <groupId>io.prometheus</groupId>
-               <artifactId>simpleclient_hotspot</artifactId>
-               <version>${prometheus.version}</version>
-        </dependency>
-        <!-- Exposition servlet -->
-        <dependency>
-                <groupId>io.prometheus</groupId>
-                <artifactId>simpleclient_servlet</artifactId>
-                <version>${prometheus.version}</version>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-prometheus-utils</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
         </dependency>
         <dependency>
                 <groupId>org.apache.nifi</groupId>
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 75ca809..3a4bd20 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -28,7 +28,7 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
@@ -36,6 +36,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.jetty.server.Server;
 
@@ -62,13 +63,22 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
     private volatile Map<String, Gauge> gauges;
     private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry();
 
+    public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+            .name("prometheus-reporting-task-ssl-context")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+                    + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
+            .required(false)
+            .identifiesControllerService(RestrictedSSLContextService.class)
+            .build();
+
     private static final List<PropertyDescriptor> properties;
 
     static {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
         props.add(PrometheusMetricsUtil.INSTANCE_ID);
-        props.add(PrometheusMetricsUtil.SSL_CONTEXT);
+        props.add(SSL_CONTEXT);
         props.add(PrometheusMetricsUtil.CLIENT_AUTH);
         properties = Collections.unmodifiableList(props);
     }
@@ -81,7 +91,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
     @OnEnabled
     public void onScheduled(final ConfigurationContext context) {
         RECORD_REGISTRY.clear();
-        SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
+        SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
         final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
 
         try {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index c8a269a..91b1bc0 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -36,14 +36,15 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.jetty.server.Server;
 
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
 
 @Tags({ "reporting", "prometheus", "metrics", "time series data" })
 @CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
@@ -53,6 +54,15 @@ public class PrometheusReportingTask extends AbstractReportingTask {
 
     private PrometheusServer prometheusServer;
 
+    public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+            .name("prometheus-reporting-task-ssl-context")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+                    + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
+            .required(false)
+            .identifiesControllerService(RestrictedSSLContextService.class)
+            .build();
+
     public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
             .name("prometheus-reporting-task-metrics-strategy")
             .displayName("Metrics Reporting Strategy")
@@ -79,7 +89,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
         props.add(PrometheusMetricsUtil.INSTANCE_ID);
         props.add(METRICS_STRATEGY);
         props.add(SEND_JVM_METRICS);
-        props.add(PrometheusMetricsUtil.SSL_CONTEXT);
+        props.add(SSL_CONTEXT);
         props.add(PrometheusMetricsUtil.CLIENT_AUTH);
         properties = Collections.unmodifiableList(props);
     }
@@ -91,7 +101,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
 
     @OnScheduled
     public void onScheduled(final ConfigurationContext context) {
-        SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
+        SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
         final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
 
         try {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 1aaf095..02a54b9 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -30,7 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.ListRecordSet;
@@ -128,7 +128,7 @@ public class TestPrometheusRecordSink {
 
         final PropertyValue pValue = mock(StandardPropertyValue.class);
         when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString));
-        when(context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT)).thenReturn(pValue);
+        when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
         when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
 
         final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index 0943e0b..381c68f 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -33,7 +33,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.RunStatus;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockConfigurationContext;