You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ym...@apache.org on 2020/04/03 20:13:05 UTC
[nifi] branch master updated: NIFI-7273: Add flow metrics REST
endpoint with for Prometheus scraping (#4156)
This is an automated email from the ASF dual-hosted git repository.
ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new a093af2 NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)
a093af2 is described below
commit a093af2d42c34200011847f8bf40f92ccb89614d
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Apr 3 16:12:53 2020 -0400
NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)
* NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping
* NIFI-7273: Changed method name, fix handling when analytics not enabled
* NIFI-7273: Removed attachment header from Prometheus metrics endpoint
* NIFI-7273: Removed unused variable
---
.../nifi-prometheus-utils/pom.xml | 94 +++++++++++++++++++++
.../prometheus/util}/PrometheusMetricsUtil.java | 97 +++++++++++++++++++---
nifi-nar-bundles/nifi-extension-utils/pom.xml | 1 +
.../nifi-framework/nifi-web/nifi-web-api/pom.xml | 5 ++
.../org/apache/nifi/web/NiFiServiceFacade.java | 5 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 52 ++++++++++++
.../java/org/apache/nifi/web/api/FlowResource.java | 65 +++++++++++++++
.../nifi-prometheus-reporting-task/pom.xml | 36 +-------
.../reporting/prometheus/PrometheusRecordSink.java | 16 +++-
.../prometheus/PrometheusReportingTask.java | 22 +++--
.../prometheus/TestPrometheusRecordSink.java | 4 +-
.../prometheus/TestPrometheusReportingTask.java | 2 +-
12 files changed, 343 insertions(+), 56 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
new file mode 100644
index 0000000..b2f8b91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-extension-utils</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-prometheus-utils</artifactId>
+ <packaging>jar</packaging>
+ <description>
+ This nifi-prometheus-utils module is designed to capture common patterns
+ and utilities that can be leveraged by components that use Prometheus capabilities to
+ help promote reuse. These patterns may become framework level features
+ or may simply be made available through this utility. It is ok for this
+ module to have dependencies but care should be taken when adding dependencies
+ as this increases the cost of utilizing this module in various nars.
+ </description>
+ <properties>
+ <prometheus.version>0.3.0</prometheus.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-metrics</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </dependency>
+ <!-- The client -->
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ <!-- Hotspot JVM metrics -->
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_hotspot</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ <!-- Exposition servlet -->
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_servlet</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
+ allow NiFi to be compiled on those JDKs. -->
+ <id>jigsaw</id>
+ <activation>
+ <jdk>(1.8,)</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
similarity index 83%
rename from nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index 33222c7..0ac5f47 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package org.apache.nifi.reporting.prometheus.api;
+package org.apache.nifi.prometheus.util;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import io.prometheus.client.Counter;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@@ -34,11 +37,11 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.RestrictedSSLContextService;
public class PrometheusMetricsUtil {
@@ -51,6 +54,10 @@ public class PrometheusMetricsUtil {
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
+ private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry();
+ private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
+
+ public static final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
@@ -81,15 +88,6 @@ public class PrometheusMetricsUtil {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
- .name("prometheus-reporting-task-ssl-context")
- .displayName("SSL Context Service")
- .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
- + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
- .required(false)
- .identifiesControllerService(RestrictedSSLContextService.class)
- .build();
-
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
@@ -137,6 +135,16 @@ public class PrometheusMetricsUtil {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
+ private static final Counter TOTAL_BYTES_READ = Counter.build().name("nifi_total_bytes_read")
+ .help("Total number of bytes read by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+ .register(NIFI_REGISTRY);
+
+ private static final Counter TOTAL_BYTES_WRITTEN = Counter.build().name("nifi_total_bytes_written")
+ .help("Total number of bytes written by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+ .register(NIFI_REGISTRY);
+
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
.name("nifi_amount_bytes_written")
.help("Total number of bytes written by the component")
@@ -267,6 +275,42 @@ public class PrometheusMetricsUtil {
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
+ // Connection status analytics metrics
+ private static final Gauge TIME_TO_BYTES_BACKPRESSURE_PREDICTION = Gauge.build()
+ .name("nifi_time_to_bytes_backpressure_prediction")
+ .help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to bytes in the queue")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(CONNECTION_ANALYTICS_REGISTRY);
+
+ private static final Gauge TIME_TO_COUNT_BACKPRESSURE_PREDICTION = Gauge.build()
+ .name("nifi_time_to_count_backpressure_prediction")
+ .help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to number of objects in the queue")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(CONNECTION_ANALYTICS_REGISTRY);
+
+ private static final Gauge BYTES_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
+ .name("nifi_bytes_at_next_interval_prediction")
+ .help("Predicted number of bytes in the queue at the next configured interval")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(CONNECTION_ANALYTICS_REGISTRY);
+
+ private static final Gauge COUNT_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
+ .name("nifi_count_at_next_interval_prediction")
+ .help("Predicted number of objects in the queue at the next configured interval")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(CONNECTION_ANALYTICS_REGISTRY);
+
+ private static final Gauge BULLETIN = Gauge.build()
+ .name("nifi_bulletin")
+ .help("Bulletin reported by the NiFi instance")
+ .labelNames("instance", "component_type", "component_id", "parent_id",
+ "node_address", "category", "source_name", "source_id", "level")
+ .register(BULLETIN_REGISTRY);
+
///////////////////////////////////////////////////////////////
// JVM Metrics
///////////////////////////////////////////////////////////////
@@ -350,6 +394,8 @@ public class PrometheusMetricsUtil {
AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
+ TOTAL_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesRead());
+ TOTAL_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
@@ -399,6 +445,8 @@ public class PrometheusMetricsUtil {
AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
+ TOTAL_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesRead());
+ TOTAL_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
@@ -463,6 +511,8 @@ public class PrometheusMetricsUtil {
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
+ TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
@@ -487,6 +537,8 @@ public class PrometheusMetricsUtil {
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
+ TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
@@ -552,4 +604,27 @@ public class PrometheusMetricsUtil {
return JVM_REGISTRY;
}
+ public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName,
+ String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) {
+ if(statusAnalytics != null) {
+ Map<String, Long> predictions = statusAnalytics.getPredictions();
+ TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(predictions.get("timeToBytesBackpressureMillis"));
+ TIME_TO_COUNT_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(predictions.get("timeToCountBackpressureMillis"));
+ BYTES_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(predictions.get("nextIntervalBytes"));
+ COUNT_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(predictions.get("nextIntervalCount"));
+ }
+
+ return CONNECTION_ANALYTICS_REGISTRY;
+ }
+
+ public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress,
+ String category, String sourceName, String sourceId, String level) {
+
+ BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1);
+ return BULLETIN_REGISTRY;
+ }
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index 037f6b4..c5b3201 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -35,6 +35,7 @@
<module>nifi-database-utils</module>
<module>nifi-database-test-utils</module>
<module>nifi-service-utils</module>
+ <module>nifi-prometheus-utils</module>
</modules>
</project>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 30bb589..46de87a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -257,6 +257,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-prometheus-utils</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 5107451..fe69a5a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -315,6 +315,11 @@ public interface NiFiServiceFacade {
FlowConfigurationEntity getFlowConfiguration();
/**
+ * Gets the metrics for the flow.
+ */
+ void generateFlowMetrics();
+
+ /**
* Updates the configuration for this controller.
*
* @param revision Revision to compare with current base revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index c492123..f6d48ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -94,12 +94,14 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.authorization.Permissions;
import org.apache.nifi.registry.bucket.Bucket;
@@ -5295,6 +5297,56 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void generateFlowMetrics() {
+
+ String instanceId = controllerFacade.getInstanceId();
+ ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
+ PrometheusMetricsUtil.createNifiMetrics(rootPGStatus, instanceId, "", "RootProcessGroup",
+ PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
+ PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId);
+
+ // Get Connection Status Analytics (predictions, e.g.)
+ Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
+ for (Connection c : connections) {
+ // If a ResourceNotFoundException is thrown, analytics hasn't been enabled
+ try {
+ PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
+ instanceId,
+ "Connection",
+ c.getName(),
+ c.getIdentifier(),
+ c.getProcessGroup().getIdentifier(),
+ c.getSource().getName(),
+ c.getSource().getIdentifier(),
+ c.getDestination().getName(),
+ c.getDestination().getIdentifier()
+ );
+ } catch (ResourceNotFoundException rnfe) {
+ break;
+ }
+ }
+
+ // Create a query to get all bulletins
+ final BulletinQueryDTO query = new BulletinQueryDTO();
+ BulletinBoardDTO bulletinBoardDTO = getBulletinBoard(query);
+ for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) {
+ BulletinDTO bulletin = bulletinEntity.getBulletin();
+ if(bulletin != null) {
+ PrometheusMetricsUtil.createBulletinMetrics(instanceId,
+ "Bulletin",
+ String.valueOf(bulletin.getId()),
+ bulletin.getGroupId() == null ? "" : bulletin.getGroupId(),
+ bulletin.getNodeAddress() == null ? "" : bulletin.getNodeAddress(),
+ bulletin.getCategory(),
+ bulletin.getSourceName(),
+ bulletin.getSourceId(),
+ bulletin.getLevel()
+ );
+ }
+ }
+ }
+
+ @Override
public boolean isClustered() {
return controllerFacade.isClustered();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 7072214..6437249 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.api;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -43,6 +45,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
@@ -129,6 +132,10 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Comparator;
@@ -381,6 +388,64 @@ public class FlowResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
+ /**
+ * Retrieves the metrics of the entire flow.
+ *
+ * @return A flowMetricsEntity.
+ * @throws InterruptedException if interrupted
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.WILDCARD)
+ @Path("metrics/{producer}")
+ @ApiOperation(
+ value = "Gets all metrics for the flow from a particular node",
+ response = StreamingOutput.class,
+ authorizations = {
+ @Authorization(value = "Read - /flow")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getFlowMetrics(
+ @ApiParam(
+ value = "The producer for flow file metrics. Each producer may have its own output format.",
+ required = true
+ )
+ @PathParam("producer") final String producer) throws InterruptedException {
+
+ authorizeFlow();
+
+ if ("prometheus".equalsIgnoreCase(producer)) {
+ // get this process group flow
+ serviceFacade.generateFlowMetrics();
+ // generate a streaming response
+ final StreamingOutput response = output -> {
+ Writer writer = new BufferedWriter(new OutputStreamWriter(output));
+ for (CollectorRegistry collectorRegistry : PrometheusMetricsUtil.ALL_REGISTRIES) {
+ TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
+ // flush the response
+ output.flush();
+ }
+ writer.flush();
+ writer.close();
+ };
+
+ return generateOkResponse(response)
+ .type(MediaType.TEXT_PLAIN_TYPE)
+ .build();
+ } else {
+ throw new ResourceNotFoundException("The specified producer is missing or invalid.");
+ }
+ }
+
// -------------------
// controller services
// -------------------
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
index 300c601..367cc49 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml
@@ -16,9 +16,6 @@
<artifactId>nifi-prometheus-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
-<properties>
- <prometheus.version>0.3.0</prometheus.version>
-</properties>
<artifactId>nifi-prometheus-reporting-task</artifactId>
<description>Prometheus /metrics http endpoint for monitoring</description>
@@ -26,20 +23,6 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-metrics</artifactId>
- <version>1.12.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- <version>1.12.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
@@ -53,23 +36,10 @@
<artifactId>nifi-record</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
- <!-- The client -->
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient</artifactId>
- <version>${prometheus.version}</version>
- </dependency>
- <!-- Hotspot JVM metrics -->
<dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_hotspot</artifactId>
- <version>${prometheus.version}</version>
- </dependency>
- <!-- Exposition servlet -->
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_servlet</artifactId>
- <version>${prometheus.version}</version>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-prometheus-utils</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 75ca809..3a4bd20 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -28,7 +28,7 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
@@ -36,6 +36,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
@@ -62,13 +63,22 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
private volatile Map<String, Gauge> gauges;
private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry();
+ public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+ .name("prometheus-reporting-task-ssl-context")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
+ .required(false)
+ .identifiesControllerService(RestrictedSSLContextService.class)
+ .build();
+
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
- props.add(PrometheusMetricsUtil.SSL_CONTEXT);
+ props.add(SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@@ -81,7 +91,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
@OnEnabled
public void onScheduled(final ConfigurationContext context) {
RECORD_REGISTRY.clear();
- SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
+ SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
try {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index c8a269a..91b1bc0 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -36,14 +36,15 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
-import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
+import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
@@ -53,6 +54,15 @@ public class PrometheusReportingTask extends AbstractReportingTask {
private PrometheusServer prometheusServer;
+ public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+ .name("prometheus-reporting-task-ssl-context")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
+ .required(false)
+ .identifiesControllerService(RestrictedSSLContextService.class)
+ .build();
+
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-strategy")
.displayName("Metrics Reporting Strategy")
@@ -79,7 +89,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
- props.add(PrometheusMetricsUtil.SSL_CONTEXT);
+ props.add(SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@@ -91,7 +101,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@OnScheduled
public void onScheduled(final ConfigurationContext context) {
- SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
+ SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
try {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 1aaf095..02a54b9 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -30,7 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.ListRecordSet;
@@ -128,7 +128,7 @@ public class TestPrometheusRecordSink {
final PropertyValue pValue = mock(StandardPropertyValue.class);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString));
- when(context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT)).thenReturn(pValue);
+ when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index 0943e0b..381c68f 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -33,7 +33,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;