You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/09/18 14:14:00 UTC

[nifi] branch main updated: NIFI-7811: Prevent NPE and bad behavior when PrometheusReportingTask can't start

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f531b7a  NIFI-7811: Prevent NPE and bad behavior when PrometheusReportingTask can't start
f531b7a is described below

commit f531b7a780619ba6b0662a181a31d42c16f795cf
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Sep 15 13:31:32 2020 -0400

    NIFI-7811: Prevent NPE and bad behavior when PrometheusReportingTask can't start
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4531.
---
 .../reporting/prometheus/PrometheusRecordSink.java | 20 ++++++---
 .../prometheus/PrometheusReportingTask.java        | 44 ++++++++++++-------
 .../reporting/prometheus/PrometheusServer.java     | 50 +++++++++++++---------
 .../prometheus/PrometheusReportingTaskIT.java      | 33 ++++++++++----
 .../prometheus/TestPrometheusRecordSink.java       | 18 ++++++++
 5 files changed, 117 insertions(+), 48 deletions(-)

diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 91fd7c2..bc4009d 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
@@ -120,7 +121,8 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
             prometheusServer.setMetricsCollectors(metricsCollectors);
             getLogger().info("Started JETTY server");
         } catch (Exception e) {
-            getLogger().error("Failed to start Jetty server", e);
+            // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
+            throw new ProcessException("Failed to start Jetty server", e);
         }
     }
 
@@ -181,15 +183,23 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
 
     @OnDisabled
     public void onStopped() throws Exception {
-        Server server = prometheusServer.getServer();
-        server.stop();
+        if (prometheusServer != null) {
+            Server server = prometheusServer.getServer();
+            if (server != null) {
+                server.stop();
+            }
+        }
         recordSchema = null;
     }
 
     @OnShutdown
     public void onShutDown() throws Exception {
-        Server server = prometheusServer.getServer();
-        server.stop();
+        if (prometheusServer != null) {
+            Server server = prometheusServer.getServer();
+            if (server != null) {
+                server.stop();
+            }
+        }
         recordSchema = null;
     }
 
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index 60c64f7..774d036 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.reporting.prometheus;
 
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
 import io.prometheus.client.CollectorRegistry;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -34,23 +28,32 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
 import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.jetty.server.Server;
 
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
 import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
 import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
 import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
 
 @Tags({ "reporting", "prometheus", "metrics", "time series data" })
-@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
-        + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
+@CapabilityDescription("Reports metrics in Prometheus format by creating a /metrics HTTP(S) endpoint which can be used for external monitoring of the application."
+        + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance. Note that if the underlying Jetty server (i.e. the "
+        + "Prometheus endpoint) cannot be started (for example if two PrometheusReportingTask instances are started on the same port), this may cause a delay in "
+        + "shutting down NiFi while it waits for the server resources to be cleaned up.")
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec")
 public class PrometheusReportingTask extends AbstractReportingTask {
 
@@ -145,24 +148,35 @@ public class PrometheusReportingTask extends AbstractReportingTask {
             this.prometheusServer.setMetricsCollectors(metricsCollectors);
             getLogger().info("Started JETTY server");
         } catch (Exception e) {
-            getLogger().error("Failed to start Jetty server", e);
+            // Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
+            throw new ProcessException("Failed to start Jetty server", e);
         }
     }
 
     @OnStopped
     public void OnStopped() throws Exception {
-        Server server = this.prometheusServer.getServer();
-        server.stop();
+        if (prometheusServer != null) {
+            Server server = prometheusServer.getServer();
+            if (server != null) {
+                server.stop();
+            }
+        }
     }
 
     @OnShutdown
     public void onShutDown() throws Exception {
-        Server server = prometheusServer.getServer();
-        server.stop();
+        if (prometheusServer != null) {
+            Server server = prometheusServer.getServer();
+            if (server != null) {
+                server.stop();
+            }
+        }
     }
 
     @Override
     public void onTrigger(final ReportingContext context) {
-        this.prometheusServer.setReportingContext(context);
+        if (prometheusServer != null) {
+            prometheusServer.setReportingContext(context);
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
index d57f1c1..cd815f5 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
@@ -17,20 +17,8 @@
 
 package org.apache.nifi.reporting.prometheus;
 
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
-import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.ssl.SSLContextService;
@@ -45,8 +33,18 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
 
 public class PrometheusServer {
     private static ComponentLog logger;
@@ -86,10 +84,16 @@ public class PrometheusServer {
         PrometheusServer.logger = logger;
         metricsCollectors = Collections.emptyList();
         this.server = new Server(addr);
-
         this.handler = new ServletContextHandler(server, "/metrics");
         this.handler.addServlet(new ServletHolder(new MetricsServlet()), "/");
-        this.server.start();
+        try {
+            this.server.start();
+        } catch (Exception e) {
+            // If Jetty couldn't start, stop it explicitly to avoid dangling threads
+            logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually");
+            this.server.stop();
+            throw e;
+        }
     }
 
     public PrometheusServer(int addr, SSLContextService sslContextService, ComponentLog logger, boolean needClientAuth, boolean wantClientAuth) throws Exception {
@@ -108,8 +112,14 @@ public class PrometheusServer {
                 new HttpConnectionFactory(httpsConfiguration));
         https.setPort(addr);
         this.server.setConnectors(new Connector[]{https});
-        this.server.start();
-
+        try {
+            this.server.start();
+        } catch (Exception e) {
+            // If Jetty couldn't start, stop it explicitly to avoid dangling threads
+            logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually");
+            this.server.stop();
+            throw e;
+        }
     }
 
     private SslContextFactory createSslFactory(final SSLContextService sslService, boolean needClientAuth, boolean wantClientAuth) {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
index 52a7444..a23bd30c 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTaskIT.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.reporting.prometheus;
 
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -32,8 +25,9 @@ import org.apache.http.util.EntityUtils;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockConfigurationContext;
@@ -44,6 +38,15 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
 public class PrometheusReportingTaskIT {
     private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
     private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
@@ -183,4 +186,18 @@ public class PrometheusReportingTaskIT {
             // Ignore
         }
     }
+
+    @Test
+    public void testTwoInstances() throws IOException, InterruptedException, InitializationException {
+        testedReportingTask.initialize(reportingInitContextStub);
+        testedReportingTask.onScheduled(configurationContextStub);
+        PrometheusReportingTask testedReportingTask2 = new PrometheusReportingTask();
+        testedReportingTask2.initialize(reportingInitContextStub);
+        try {
+            testedReportingTask2.onScheduled(configurationContextStub);
+            fail("Should have reported Address In Use");
+        } catch (ProcessException pe) {
+            // Do nothing, this is the expected behavior
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 34bb657..7dd3e4a 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -29,6 +29,7 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -59,6 +60,7 @@ import java.util.UUID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -111,6 +113,22 @@ public class TestPrometheusRecordSink {
         }
     }
 
+    @Test
+    public void testTwoInstances() throws InitializationException {
+        PrometheusRecordSink sink1 = initTask();
+        try {
+            PrometheusRecordSink sink2 = initTask();
+            fail("Should have reported Address In Use");
+        } catch (ProcessException pe) {
+            // Do nothing, this is the expected behavior
+        }
+        try {
+            sink1.onStopped();
+        } catch (Exception e) {
+            // Do nothing, just need to shut down the server before the next run
+        }
+    }
+
     private String getMetrics() throws IOException {
         URL url = new URL("http://localhost:" + portString + "/metrics");
         HttpURLConnection con = (HttpURLConnection) url.openConnection();