You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/09/18 14:14:00 UTC
[nifi] branch main updated: NIFI-7811: Prevent NPE and bad behavior
when PrometheusReportingTask can't start
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f531b7a NIFI-7811: Prevent NPE and bad behavior when PrometheusReportingTask can't start
f531b7a is described below
commit f531b7a780619ba6b0662a181a31d42c16f795cf
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Sep 15 13:31:32 2020 -0400
NIFI-7811: Prevent NPE and bad behavior when PrometheusReportingTask can't start
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4531.
---
.../reporting/prometheus/PrometheusRecordSink.java | 20 ++++++---
.../prometheus/PrometheusReportingTask.java | 44 ++++++++++++-------
.../reporting/prometheus/PrometheusServer.java | 50 +++++++++++++---------
.../prometheus/PrometheusReportingTaskIT.java | 33 ++++++++++----
.../prometheus/TestPrometheusRecordSink.java | 18 ++++++++
5 files changed, 117 insertions(+), 48 deletions(-)
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 91fd7c2..bc4009d 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
@@ -120,7 +121,8 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server");
} catch (Exception e) {
- getLogger().error("Failed to start Jetty server", e);
+ // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
+ throw new ProcessException("Failed to start Jetty server", e);
}
}
@@ -181,15 +183,23 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
@OnDisabled
public void onStopped() throws Exception {
- Server server = prometheusServer.getServer();
- server.stop();
+ if (prometheusServer != null) {
+ Server server = prometheusServer.getServer();
+ if (server != null) {
+ server.stop();
+ }
+ }
recordSchema = null;
}
@OnShutdown
public void onShutDown() throws Exception {
- Server server = prometheusServer.getServer();
- server.stop();
+ if (prometheusServer != null) {
+ Server server = prometheusServer.getServer();
+ if (server != null) {
+ server.stop();
+ }
+ }
recordSchema = null;
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index 60c64f7..774d036 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -17,12 +17,6 @@
package org.apache.nifi.reporting.prometheus;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -34,23 +28,32 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
-@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
- + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
+@CapabilityDescription("Reports metrics in Prometheus format by creating a /metrics HTTP(S) endpoint which can be used for external monitoring of the application."
+ + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance. Note that if the underlying Jetty server (i.e. the "
+ + "Prometheus endpoint) cannot be started (for example if two PrometheusReportingTask instances are started on the same port), this may cause a delay in "
+ + "shutting down NiFi while it waits for the server resources to be cleaned up.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec")
public class PrometheusReportingTask extends AbstractReportingTask {
@@ -145,24 +148,35 @@ public class PrometheusReportingTask extends AbstractReportingTask {
this.prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server");
} catch (Exception e) {
- getLogger().error("Failed to start Jetty server", e);
+ // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
+ throw new ProcessException("Failed to start Jetty server", e);
}
}
@OnStopped
public void OnStopped() throws Exception {
- Server server = this.prometheusServer.getServer();
- server.stop();
+ if (prometheusServer != null) {
+ Server server = prometheusServer.getServer();
+ if (server != null) {
+ server.stop();
+ }
+ }
}
@OnShutdown
public void onShutDown() throws Exception {
- Server server = prometheusServer.getServer();
- server.stop();
+ if (prometheusServer != null) {
+ Server server = prometheusServer.getServer();
+ if (server != null) {
+ server.stop();
+ }
+ }
}
@Override
public void onTrigger(final ReportingContext context) {
- this.prometheusServer.setReportingContext(context);
+ if (prometheusServer != null) {
+ prometheusServer.setReportingContext(context);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
index d57f1c1..cd815f5 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
@@ -17,20 +17,8 @@
package org.apache.nifi.reporting.prometheus;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
-import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.SSLContextService;
@@ -45,8 +33,18 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
public class PrometheusServer {
private static ComponentLog logger;
@@ -86,10 +84,16 @@ public class PrometheusServer {
PrometheusServer.logger = logger;
metricsCollectors = Collections.emptyList();
this.server = new Server(addr);
-
this.handler = new ServletContextHandler(server, "/metrics");
this.handler.addServlet(new ServletHolder(new MetricsServlet()), "/");
- this.server.start();
+ try {
+ this.server.start();
+ } catch (Exception e) {
+ // If Jetty couldn't start, stop it explicitly to avoid dangling threads
+ logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually");
+ this.server.stop();
+ throw e;
+ }
}
public PrometheusServer(int addr, SSLContextService sslContextService, ComponentLog logger, boolean needClientAuth, boolean wantClientAuth) throws Exception {
@@ -108,8 +112,14 @@ public class PrometheusServer {
new HttpConnectionFactory(httpsConfiguration));
https.setPort(addr);
this.server.setConnectors(new Connector[]{https});
- this.server.start();
-
+ try {
+ this.server.start();
+ } catch (Exception e) {
+ // If Jetty couldn't start, stop it explicitly to avoid dangling threads
+ logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually");
+ this.server.stop();
+ throw e;
+ }
}
private SslContextFactory createSslFactory(final SSLContextService sslService, boolean needClientAuth, boolean wantClientAuth) {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
index 52a7444..a23bd30c 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
@@ -16,13 +16,6 @@
*/
package org.apache.nifi.reporting.prometheus;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -32,8 +25,9 @@ import org.apache.http.util.EntityUtils;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
@@ -44,6 +38,15 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
public class PrometheusReportingTaskIT {
private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
@@ -183,4 +186,18 @@ public class PrometheusReportingTaskIT {
// Ignore
}
}
+
+ @Test
+ public void testTwoInstances() throws IOException, InterruptedException, InitializationException {
+ testedReportingTask.initialize(reportingInitContextStub);
+ testedReportingTask.onScheduled(configurationContextStub);
+ PrometheusReportingTask testedReportingTask2 = new PrometheusReportingTask();
+ testedReportingTask2.initialize(reportingInitContextStub);
+ try {
+ testedReportingTask2.onScheduled(configurationContextStub);
+ fail("Should have reported Address In Use");
+ } catch (ProcessException pe) {
+ // Do nothing, this is the expected behavior
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 34bb657..7dd3e4a 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -29,6 +29,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -59,6 +60,7 @@ import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -111,6 +113,22 @@ public class TestPrometheusRecordSink {
}
}
+ @Test
+ public void testTwoInstances() throws InitializationException {
+ PrometheusRecordSink sink1 = initTask();
+ try {
+ PrometheusRecordSink sink2 = initTask();
+ fail("Should have reported Address In Use");
+ } catch (ProcessException pe) {
+ // Do nothing, this is the expected behavior
+ }
+ try {
+ sink1.onStopped();
+ } catch (Exception e) {
+ // Do nothing, just need to shut down the server before the next run
+ }
+ }
+
private String getMetrics() throws IOException {
URL url = new URL("http://localhost:" + portString + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();