You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/07 18:14:40 UTC

[nifi] branch master updated: NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 09cece8  NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.
09cece8 is described below

commit 09cece8e9936f7d8e6662e6f887444e85a36f79f
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Tue Apr 7 15:35:20 2020 +0200

    NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.
    
    NIFI-7314 In HandleHttpRequest returning 503 when rejecting pending requests before shutdown.
    NIFI-7314 In HandleHttpRequest add logs and better response message during cleanup.
    
    This closes #4191.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../processors/standard/HandleHttpRequest.java     |  46 +-
 .../standard/ITestHandleHttpRequest.java           | 567 +++++++++++++--------
 2 files changed, 400 insertions(+), 213 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 3717a33..66b940a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -25,7 +25,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
@@ -304,6 +304,7 @@ public class HandleHttpRequest extends AbstractProcessor {
     }
 
     private volatile Server server;
+    private volatile boolean ready;
     private AtomicBoolean initialized = new AtomicBoolean(false);
     private volatile BlockingQueue<HttpRequestContainer> containerQueue;
     private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
@@ -323,7 +324,7 @@ public class HandleHttpRequest extends AbstractProcessor {
         initialized.set(false);
     }
 
-    private synchronized void initializeServer(final ProcessContext context) throws Exception {
+    synchronized void initializeServer(final ProcessContext context) throws Exception {
         if(initialized.get()){
             return;
         }
@@ -461,6 +462,12 @@ public class HandleHttpRequest extends AbstractProcessor {
 
                     response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
                     return;
+                } else if (!ready) {
+                    getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE",
+                        new Object[]{request.getRemoteAddr()});
+
+                    response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+                    return;
                 }
 
                 // Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
@@ -491,6 +498,7 @@ public class HandleHttpRequest extends AbstractProcessor {
         getLogger().info("Server started and listening on port " + getPort());
 
         initialized.set(true);
+        ready = true;
     }
 
     protected int getPort() {
@@ -535,10 +543,13 @@ public class HandleHttpRequest extends AbstractProcessor {
         return sslFactory;
     }
 
-    @OnStopped
+    @OnUnscheduled
     public void shutdown() throws Exception {
+        ready = false;
+
         if (server != null) {
             getLogger().debug("Shutting down server");
+            rejectPendingRequests();
             server.stop();
             server.destroy();
             server.join();
@@ -547,6 +558,35 @@ public class HandleHttpRequest extends AbstractProcessor {
         }
     }
 
+    void rejectPendingRequests() {
+        HttpRequestContainer container;
+        while ((container = getNextContainer()) != null) {
+            try {
+                getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
+                    new Object[]{container.getRequest().getRemoteAddr()});
+
+                HttpServletResponse response = container.getResponse();
+                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+                container.getContext().complete();
+            } catch (final IOException e) {
+                getLogger().warn("Failed to send HTTP response to {} due to {}",
+                    new Object[]{container.getRequest().getRemoteAddr(), e});
+            }
+        }
+    }
+
+    private HttpRequestContainer getNextContainer() {
+        HttpRequestContainer container;
+        try {
+            container = containerQueue.poll(2, TimeUnit.SECONDS);
+        } catch (final InterruptedException e) {
+            getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup.");
+            container = null;
+        }
+
+        return container;
+    }
+
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
         if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
index f65c0b9..391d22d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
@@ -20,6 +20,8 @@ import com.google.api.client.util.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
+import okhttp3.Call;
+import okhttp3.Callback;
 import okhttp3.MediaType;
 import okhttp3.MultipartBody;
 import okhttp3.OkHttpClient;
@@ -28,17 +30,17 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.HTTPUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
 import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -49,22 +51,31 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ITestHandleHttpRequest {
 
+    private HandleHttpRequest processor;
+
     private static Map<String, String> getTruststoreProperties() {
         final Map<String, String> props = new HashMap<>();
         props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
@@ -103,9 +114,21 @@ public class ITestHandleHttpRequest {
         return service.createSSLContext(clientAuth);
     }
 
+    @After
+    public void tearDown() throws Exception {
+        if (processor != null) {
+            processor.shutdown();
+        }
+    }
+
     @Test(timeout=30000)
     public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(1);
+
+        processor = createProcessor(serverReady, requestSent);
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(HandleHttpRequest.PORT, "0");
 
         final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -113,73 +136,65 @@ public class ITestHandleHttpRequest {
         runner.enableControllerService(contextMap);
         runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
 
-        // trigger processor to stop but not shutdown.
-        runner.run(1, false);
-        try {
-            final Thread httpThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
-                        final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
-                                + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-                        connection.setDoOutput(false);
-                        connection.setRequestMethod("GET");
-                        connection.setRequestProperty("header1", "value1");
-                        connection.setRequestProperty("header2", "");
-                        connection.setRequestProperty("header3", "apple=orange");
-                        connection.setConnectTimeout(3000);
-                        connection.setReadTimeout(3000);
-
-                        StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
-                    } catch (final Throwable t) {
-                        t.printStackTrace();
-                        Assert.fail(t.toString());
-                    }
+        final Thread httpThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    serverReady.await();
+                    final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+                    final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+                            + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+
+                    connection.setDoOutput(false);
+                    connection.setRequestMethod("GET");
+                    connection.setRequestProperty("header1", "value1");
+                    connection.setRequestProperty("header2", "");
+                    connection.setRequestProperty("header3", "apple=orange");
+                    connection.setConnectTimeout(30000);
+                    connection.setReadTimeout(30000);
+
+                    sendRequest(connection, requestSent);
+                } catch (final Throwable t) {
+                    // Do nothing as HandleHttpRequest doesn't respond normally
                 }
-            });
-            httpThread.start();
-
-            while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
-                // process the request.
-                runner.run(1, false, false);
             }
+        });
 
-            runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
-            assertEquals(1, contextMap.size());
-
-            final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
-            mff.assertAttributeEquals("http.query.param.query", "true");
-            mff.assertAttributeEquals("http.query.param.value1", "value1");
-            mff.assertAttributeEquals("http.query.param.value2", "");
-            mff.assertAttributeEquals("http.query.param.value3", "");
-            mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
-            mff.assertAttributeEquals("http.headers.header1", "value1");
-            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
-        } finally {
-            // shut down the server
-            runner.run(1, true);
-        }
-    }
+        httpThread.start();
+        runner.run(1, false);
 
+        runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+        assertEquals(1, contextMap.size());
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("http.query.param.query", "true");
+        mff.assertAttributeEquals("http.query.param.value1", "value1");
+        mff.assertAttributeEquals("http.query.param.value2", "");
+        mff.assertAttributeEquals("http.query.param.value3", "");
+        mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+        mff.assertAttributeEquals("http.headers.header1", "value1");
+        mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+    }
 
     @Test(timeout=30000)
     public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
-      final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
-      runner.setProperty(HandleHttpRequest.PORT, "0");
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(1);
 
-      final MockHttpContextMap contextMap = new MockHttpContextMap();
-      runner.addControllerService("http-context-map", contextMap);
-      runner.enableControllerService(contextMap);
-      runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+        processor = createProcessor(serverReady, requestSent);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
+
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
 
-      // trigger processor to stop but not shutdown.
-      runner.run(1, false);
-      try {
         final Thread httpThread = new Thread(new Runnable() {
           @Override
           public void run() {
             try {
+              serverReady.await();
 
               final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
 
@@ -202,21 +217,15 @@ public class ITestHandleHttpRequest {
                     .writeTimeout(3000, TimeUnit.MILLISECONDS)
                   .build();
 
-              try (Response response = client.newCall(request).execute()) {
-                Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
-              }
-            } catch (final Throwable t) {
-              t.printStackTrace();
-              Assert.fail(t.toString());
+                sendRequest(client, request, requestSent);
+            } catch (Exception e) {
+                // Do nothing as HandleHttpRequest doesn't respond normally
             }
           }
         });
-        httpThread.start();
 
-        while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
-          // process the request.
-          runner.run(1, false, false);
-        }
+        httpThread.start();
+        runner.run(1, false, false);
 
         runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
         assertEquals(1, contextMap.size());
@@ -275,31 +284,30 @@ public class ITestHandleHttpRequest {
         mff.assertAttributeExists("http.multipart.fragments.sequence.number");
         mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
         mff.assertAttributeExists("http.headers.multipart.content-disposition");
-      } finally {
-        // shut down the server
-        runner.run(1, true);
-      }
     }
 
     @Test(timeout=30000)
     public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
-      final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
-      runner.setProperty(HandleHttpRequest.PORT, "0");
-
-      final MockHttpContextMap contextMap = new MockHttpContextMap();
-      contextMap.setRegisterSuccessfully(false);
-      runner.addControllerService("http-context-map", contextMap);
-      runner.enableControllerService(contextMap);
-      runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
-      // trigger processor to stop but not shutdown.
-      runner.run(1, false);
-      try {
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(1);
+        CountDownLatch resultReady = new CountDownLatch(1);
+
+        processor = createProcessor(serverReady, requestSent);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
+
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        contextMap.setRegisterSuccessfully(false);
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
         AtomicInteger responseCode = new AtomicInteger(0);
         final Thread httpThread = new Thread(new Runnable() {
           @Override
           public void run() {
             try {
+              serverReady.await();
 
               final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
 
@@ -322,29 +330,32 @@ public class ITestHandleHttpRequest {
                     .writeTimeout(20000, TimeUnit.MILLISECONDS)
                   .build();
 
-              try (Response response = client.newCall(request).execute()) {
-                responseCode.set(response.code());
-              }
+                Callback callback = new Callback() {
+                    @Override
+                    public void onFailure(Call call, IOException e) {
+                        // Not going to happen
+                    }
+
+                    @Override
+                    public void onResponse(Call call, Response response) throws IOException {
+                        responseCode.set(response.code());
+                        resultReady.countDown();
+                    }
+                };
+                sendRequest(client, request, callback, requestSent);
             } catch (final Throwable t) {
-              t.printStackTrace();
-              Assert.fail(t.toString());
+                // Do nothing as HandleHttpRequest doesn't respond normally
             }
           }
         });
-        httpThread.start();
 
-        while (responseCode.get() == 0) {
-          // process the request.
-          runner.run(1, false, false);
-        }
+        httpThread.start();
+        runner.run(1, false, false);
+        resultReady.await();
 
         runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
         assertEquals(0, contextMap.size());
         Assert.assertEquals(503, responseCode.get());
-      } finally {
-        // shut down the server
-        runner.run(1, true);
-      }
     }
 
     private byte[] generateRandomBinaryData(int i) {
@@ -373,7 +384,12 @@ public class ITestHandleHttpRequest {
 
     @Test(timeout=30000)
     public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(1);
+        CountDownLatch resultReady = new CountDownLatch(1);
+
+        processor = createProcessor(serverReady, requestSent);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(HandleHttpRequest.PORT, "0");
 
         final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -382,70 +398,160 @@ public class ITestHandleHttpRequest {
         runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
         contextMap.setRegisterSuccessfully(false);
 
-        // trigger processor to stop but not shutdown.
-        runner.run(1, false);
-        try {
-            final int[] responseCode = new int[1];
-            responseCode[0] = 0;
-            final Thread httpThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    HttpURLConnection connection = null;
-                    try {
-                        final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
-                        connection = (HttpURLConnection) new URL("http://localhost:"
-                                + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-                        connection.setDoOutput(false);
-                        connection.setRequestMethod("GET");
-                        connection.setRequestProperty("header1", "value1");
-                        connection.setRequestProperty("header2", "");
-                        connection.setRequestProperty("header3", "apple=orange");
-                        connection.setConnectTimeout(20000);
-                        connection.setReadTimeout(20000);
-
-                        StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
-                    } catch (final Throwable t) {
-                        t.printStackTrace();
-                        if(connection != null ) {
-                            try {
-                                responseCode[0] = connection.getResponseCode();
-                            } catch (IOException e) {
-                                responseCode[0] = -1;
-                            }
-                        } else {
-                            responseCode[0] = -2;
+        final int[] responseCode = new int[1];
+        responseCode[0] = 0;
+        final Thread httpThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                HttpURLConnection connection = null;
+                try {
+                    serverReady.await();
+
+                    final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+                    connection = (HttpURLConnection) new URL("http://localhost:"
+                            + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+                    connection.setDoOutput(false);
+                    connection.setRequestMethod("GET");
+                    connection.setRequestProperty("header1", "value1");
+                    connection.setRequestProperty("header2", "");
+                    connection.setRequestProperty("header3", "apple=orange");
+                    connection.setConnectTimeout(20000);
+                    connection.setReadTimeout(20000);
+
+                    sendRequest(connection, requestSent);
+                } catch (final Throwable t) {
+                    if(connection != null ) {
+                        try {
+                            responseCode[0] = connection.getResponseCode();
+                        } catch (IOException e) {
+                            responseCode[0] = -1;
                         }
+                    } else {
+                        responseCode[0] = -2;
                     }
+                } finally {
+                    resultReady.countDown();
+                }
+            }
+        });
+
+        httpThread.start();
+        runner.run(1, false, false);
+        resultReady.await();
+
+        runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
+        assertEquals(503, responseCode[0]);
+    }
+
+    @Test
+    public void testCleanup() throws Exception {
+        // GIVEN
+        int nrOfRequests = 5;
+
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(nrOfRequests);
+        CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests-1);
+
+        processor = new HandleHttpRequest() {
+            @Override
+            synchronized void initializeServer(ProcessContext context) throws Exception {
+                super.initializeServer(context);
+                serverReady.countDown();
+
+                requestSent.await();
+                while (getRequestQueueSize() < nrOfRequests) {
+                    Thread.sleep(200);
                 }
-            });
-            httpThread.start();
+            }
+        };
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
 
-            while (responseCode[0] == 0) {
-                // process the request.
-                runner.run(1, false, false);
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+        List<Response> responses = new ArrayList<>(nrOfRequests);
+        final Thread httpThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    serverReady.await();
+
+                    final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+                    OkHttpClient client =
+                        new OkHttpClient.Builder()
+                            .readTimeout(3000, TimeUnit.MILLISECONDS)
+                            .writeTimeout(3000, TimeUnit.MILLISECONDS)
+                            .build();
+                    client.dispatcher().setMaxRequests(nrOfRequests);
+                    client.dispatcher().setMaxRequestsPerHost(nrOfRequests);
+
+                    Callback callback = new Callback() {
+                        @Override
+                        public void onFailure(Call call, IOException e) {
+                            // Will only happen once for the first non-rejected request, but not important
+                        }
+
+                        @Override
+                        public void onResponse(Call call, Response response) throws IOException {
+                            responses.add(response);
+                            cleanupDone.countDown();
+                        }
+                    };
+                    IntStream.rangeClosed(1, nrOfRequests).forEach(
+                        requestCounter -> {
+                            Request request = new Request.Builder()
+                                .url(String.format("http://localhost:%s/my/" + requestCounter , port))
+                                .get()
+                                .build();
+                            sendRequest(client, request, callback, requestSent);
+                        }
+                    );
+                } catch (final Throwable t) {
+                    // Do nothing as HandleHttpRequest doesn't respond normally
+                }
             }
+        });
 
-            runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
-            assertEquals(503, responseCode[0]);
+        // WHEN
+        httpThread.start();
+        runner.run(1, false);
+        cleanupDone.await();
+
+        // THEN
+        int nrOfPendingRequests = processor.getRequestQueueSize();
+
+        runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
 
-        } finally {
-            // shut down the server
-            runner.run(1, true);
+        assertEquals(1, contextMap.size());
+        assertEquals(0, nrOfPendingRequests);
+        assertEquals(responses.size(), nrOfRequests-1);
+        for (Response response : responses) {
+            assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
+            assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
         }
     }
 
     @Test
-    public void testSecure() throws InitializationException {
+    public void testSecure() throws Exception {
         secureTest(false);
     }
 
     @Test
-    public void testSecureTwoWaySsl() throws InitializationException {
+    public void testSecureTwoWaySsl() throws Exception {
         secureTest(true);
     }
 
-    private void secureTest(boolean twoWaySsl) throws InitializationException {
-        final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+    private void secureTest(boolean twoWaySsl) throws Exception {
+        CountDownLatch serverReady = new CountDownLatch(1);
+        CountDownLatch requestSent = new CountDownLatch(1);
+
+        processor = createProcessor(serverReady, requestSent);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(HandleHttpRequest.PORT, "0");
 
         final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -458,76 +564,117 @@ public class ITestHandleHttpRequest {
         sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2");
         useSSLContextService(runner, sslProperties, twoWaySsl ? SSLContextService.ClientAuth.WANT : SSLContextService.ClientAuth.NONE);
 
-        // trigger processor to stop but not shutdown.
-        runner.run(1, false);
-        try {
-            final Thread httpThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
-                        final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
-                                + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-
-                        if (twoWaySsl) {
-                            // use a client certificate, do not reuse the server's keystore
-                            SSLContext clientSslContext = SslContextFactory.createSslContext(
-                                    getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
-                                    getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
-                                    "JKS",
-                                    getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
-                                    getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
-                                    "JKS",
-                                    null,
-                                    "TLSv1.2");
-                            connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
-                        } else {
-                            // with one-way SSL, the client still needs a truststore
-                            SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
-                                    getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
-                                    getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
-                                    "JKS",
-                                    "TLSv1.2");
-                            connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
-                        }
-                        connection.setDoOutput(false);
-                        connection.setRequestMethod("GET");
-                        connection.setRequestProperty("header1", "value1");
-                        connection.setRequestProperty("header2", "");
-                        connection.setRequestProperty("header3", "apple=orange");
-                        connection.setConnectTimeout(3000);
-                        connection.setReadTimeout(3000);
-
-                        StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
-                    } catch (final Throwable t) {
-                        t.printStackTrace();
-                        Assert.fail(t.toString());
+        final Thread httpThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    serverReady.await();
+
+                    final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+                    final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
+                            + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+
+                    if (twoWaySsl) {
+                        // use a client certificate, do not reuse the server's keystore
+                        SSLContext clientSslContext = SslContextFactory.createSslContext(
+                                getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
+                                getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
+                                "JKS",
+                                getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
+                                getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
+                                "JKS",
+                                null,
+                                "TLSv1.2");
+                        connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
+                    } else {
+                        // with one-way SSL, the client still needs a truststore
+                        SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
+                                getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
+                                getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
+                                "JKS",
+                                "TLSv1.2");
+                        connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
                     }
+                    connection.setDoOutput(false);
+                    connection.setRequestMethod("GET");
+                    connection.setRequestProperty("header1", "value1");
+                    connection.setRequestProperty("header2", "");
+                    connection.setRequestProperty("header3", "apple=orange");
+                    connection.setConnectTimeout(3000);
+                    connection.setReadTimeout(3000);
+
+                    sendRequest(connection, requestSent);
+                } catch (final Throwable t) {
+                    // Do nothing as HandleHttpRequest doesn't respond normally
                 }
-            });
-            httpThread.start();
+            }
+        });
+
+        httpThread.start();
+        runner.run(1, false, false);
 
-            while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
-                // process the request.
-                runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+        assertEquals(1, contextMap.size());
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("http.query.param.query", "true");
+        mff.assertAttributeEquals("http.query.param.value1", "value1");
+        mff.assertAttributeEquals("http.query.param.value2", "");
+        mff.assertAttributeEquals("http.query.param.value3", "");
+        mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+        mff.assertAttributeEquals("http.headers.header1", "value1");
+        mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+        mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
+    }
+
+    private HandleHttpRequest createProcessor(CountDownLatch serverReady, CountDownLatch requestSent) {
+        return new HandleHttpRequest() {
+            @Override
+            synchronized void initializeServer(ProcessContext context) throws Exception {
+                super.initializeServer(context);
+                serverReady.countDown();
+
+                requestSent.await();
+                while (getRequestQueueSize()  == 0) {
+                    Thread.sleep(200);
+                }
             }
 
-            runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
-            assertEquals(1, contextMap.size());
-
-            final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
-            mff.assertAttributeEquals("http.query.param.query", "true");
-            mff.assertAttributeEquals("http.query.param.value1", "value1");
-            mff.assertAttributeEquals("http.query.param.value2", "");
-            mff.assertAttributeEquals("http.query.param.value3", "");
-            mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
-            mff.assertAttributeEquals("http.headers.header1", "value1");
-            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
-            mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
-        } finally {
-            // shut down the server
-            runner.run(1, true);
-        }
+            @Override
+            void rejectPendingRequests() {
+                // Skip this, otherwise it would wait to make sure there are no more requests
+            }
+        };
+    }
+
+    private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
+        Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
+            .submit(() -> connection.getInputStream());
+
+        requestSent.countDown();
+
+        executionFuture.get();
+    }
+
+    private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
+        Callback callback = new Callback() {
+            @Override
+            public void onFailure(Call call, IOException e) {
+                // We (may) get a timeout as the processor doesn't answer unless there is some kind of error
+            }
+
+            @Override
+            public void onResponse(Call call, Response response) throws IOException {
+                // Not called as the processor doesn't answer unless there is some kind of error
+            }
+        };
+
+        sendRequest(client, request, callback, requestSent);
+    }
+
+    private void sendRequest(OkHttpClient client, Request request, Callback callback, CountDownLatch requestSent) {
+        client.newCall(request).enqueue(callback);
+        requestSent.countDown();
     }
 
     private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {