You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/01 16:14:57 UTC

[nifi] branch main updated: NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a27b23  NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP
4a27b23 is described below

commit 4a27b23b1f5b5abd159ee541b5d912c548dafd58
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu Feb 25 18:31:38 2021 +0100

    NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4847.
---
 .../nifi/processors/standard/ListenHTTP.java       | 22 +++++++-
 .../nifi/processors/standard/TestListenHTTP.java   | 64 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index e918a5e..b76c70f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -226,6 +226,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .defaultValue(ClientAuthentication.AUTO.name())
             .dependsOn(SSL_CONTEXT_SERVICE)
             .build();
+    public static final PropertyDescriptor MAX_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
+            .name("max-thread-pool-size")
+            .displayName("Maximum Thread Pool Size")
+            .description("The maximum number of threads to be used by the embedded Jetty server. "
+                    + "The value can be set between 8 and 1000. "
+                    + "The value of this property affects the performance of the flows and the operating system, therefore "
+                    + "the default value should only be changed in justified cases. "
+                    + "A value that is less than the default value may be suitable "
+                    + "if only a small number of HTTP clients connect to the server. A greater value may be suitable "
+                    + "if a large number of HTTP clients are expected to make requests to the server simultaneously.")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(8L, 1000L, true))
+            .defaultValue("200")
+            .build();
 
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -289,6 +303,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         descriptors.add(RETURN_CODE);
         descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
         descriptors.add(MULTIPART_READ_BUFFER_SIZE);
+        descriptors.add(MAX_THREAD_POOL_SIZE);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -321,6 +336,10 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         shutdownHttpServer(toShutdown);
     }
 
+    Server getServer() {
+        return this.server;
+    }
+
     private void shutdownHttpServer(Server toShutdown) {
         try {
             toShutdown.stop();
@@ -344,6 +363,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
         long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
         int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        int maxThreadPoolSize = context.getProperty(MAX_THREAD_POOL_SIZE).asInteger();
         throttlerRef.set(streamThrottler);
 
         final boolean sslRequired = sslContextService != null;
@@ -351,7 +371,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final ClientAuthentication clientAuthentication = getClientAuthentication(sslContextService, clientAuthenticationProperty);
 
         // thread pool for the jetty instance
-        final QueuedThreadPool threadPool = new QueuedThreadPool();
+        final QueuedThreadPool threadPool = new QueuedThreadPool(maxThreadPoolSize);
         threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier()));
 
         // create the server instance
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 09caa5c..22cf181 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -67,6 +67,8 @@ import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.util.thread.ThreadPool;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -405,6 +407,68 @@ public class TestListenHTTP {
         assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
     }
 
+    @Test
+    public void testMaxThreadPoolSizeTooLow() {
+        // GIVEN, WHEN
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
+
+        // THEN
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testMaxThreadPoolSizeTooHigh() {
+        // GIVEN, WHEN
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
+
+        // THEN
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testMaxThreadPoolSizeOkLowerBound() {
+        // GIVEN, WHEN
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
+
+        // THEN
+        runner.assertValid();
+    }
+
+    @Test
+    public void testMaxThreadPoolSizeOkUpperBound() {
+        // GIVEN, WHEN
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
+
+        // THEN
+        runner.assertValid();
+    }
+
+    @Test
+    public void testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
+        // GIVEN
+        int maxThreadPoolSize = 201;
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, Integer.toString(maxThreadPoolSize));
+
+        // WHEN
+        startWebServer();
+
+        // THEN
+        Server server = proc.getServer();
+        ThreadPool threadPool = server.getThreadPool();
+        ThreadPool.SizedThreadPool sizedThreadPool = (ThreadPool.SizedThreadPool) threadPool;
+        assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
+    }
+
     private void startSecureServer() {
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);