You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/01 16:14:57 UTC
[nifi] branch main updated: NIFI-8263: Maximum Thread Pool Size
property introduced in ListenHTTP
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4a27b23 NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP
4a27b23 is described below
commit 4a27b23b1f5b5abd159ee541b5d912c548dafd58
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu Feb 25 18:31:38 2021 +0100
NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4847.
---
.../nifi/processors/standard/ListenHTTP.java | 22 +++++++-
.../nifi/processors/standard/TestListenHTTP.java | 64 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index e918a5e..b76c70f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -226,6 +226,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.defaultValue(ClientAuthentication.AUTO.name())
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
+ public static final PropertyDescriptor MAX_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
+ .name("max-thread-pool-size")
+ .displayName("Maximum Thread Pool Size")
+ .description("The maximum number of threads to be used by the embedded Jetty server. "
+ + "The value can be set between 8 and 1000. "
+ + "The value of this property affects the performance of the flows and the operating system, therefore "
+ + "the default value should only be changed in justified cases. "
+ + "A value that is less than the default value may be suitable "
+ + "if only a small number of HTTP clients connect to the server. A greater value may be suitable "
+ + "if a large number of HTTP clients are expected to make requests to the server simultaneously.")
+ .required(true)
+ .addValidator(StandardValidators.createLongValidator(8L, 1000L, true))
+ .defaultValue("200")
+ .build();
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -289,6 +303,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(RETURN_CODE);
descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
descriptors.add(MULTIPART_READ_BUFFER_SIZE);
+ descriptors.add(MAX_THREAD_POOL_SIZE);
this.properties = Collections.unmodifiableList(descriptors);
}
@@ -321,6 +336,10 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
shutdownHttpServer(toShutdown);
}
+ Server getServer() {
+ return this.server;
+ }
+
private void shutdownHttpServer(Server toShutdown) {
try {
toShutdown.stop();
@@ -344,6 +363,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ int maxThreadPoolSize = context.getProperty(MAX_THREAD_POOL_SIZE).asInteger();
throttlerRef.set(streamThrottler);
final boolean sslRequired = sslContextService != null;
@@ -351,7 +371,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final ClientAuthentication clientAuthentication = getClientAuthentication(sslContextService, clientAuthenticationProperty);
// thread pool for the jetty instance
- final QueuedThreadPool threadPool = new QueuedThreadPool();
+ final QueuedThreadPool threadPool = new QueuedThreadPool(maxThreadPoolSize);
threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier()));
// create the server instance
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 09caa5c..22cf181 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -67,6 +67,8 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -405,6 +407,68 @@ public class TestListenHTTP {
assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
}
+ @Test
+ public void testMaxThreadPoolSizeTooLow() {
+ // GIVEN, WHEN
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
+
+ // THEN
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testMaxThreadPoolSizeTooHigh() {
+ // GIVEN, WHEN
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
+
+ // THEN
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testMaxThreadPoolSizeOkLowerBound() {
+ // GIVEN, WHEN
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
+
+ // THEN
+ runner.assertValid();
+ }
+
+ @Test
+ public void testMaxThreadPoolSizeOkUpperBound() {
+ // GIVEN, WHEN
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
+
+ // THEN
+ runner.assertValid();
+ }
+
+ @Test
+ public void testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
+ // GIVEN
+ int maxThreadPoolSize = 201;
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, Integer.toString(maxThreadPoolSize));
+
+ // WHEN
+ startWebServer();
+
+ // THEN
+ Server server = proc.getServer();
+ ThreadPool threadPool = server.getThreadPool();
+ ThreadPool.SizedThreadPool sizedThreadPool = (ThreadPool.SizedThreadPool) threadPool;
+ assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
+ }
+
private void startSecureServer() {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);