You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/07 18:14:40 UTC
[nifi] branch master updated: NIFI-7314 HandleHttpRequest stops
Jetty in OnUnscheduled instead of OnStopped. Also reject pending request
and clean their queue when shutting down.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 09cece8 NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.
09cece8 is described below
commit 09cece8e9936f7d8e6662e6f887444e85a36f79f
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Tue Apr 7 15:35:20 2020 +0200
NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.
NIFI-7314 In HandleHttpRequest returning 503 when rejecting pending requests before shutdown.
NIFI-7314 In HandleHttpRequest add logs and better response message during cleanup.
This closes #4191.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../processors/standard/HandleHttpRequest.java | 46 +-
.../standard/ITestHandleHttpRequest.java | 567 +++++++++++++--------
2 files changed, 400 insertions(+), 213 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 3717a33..66b940a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -25,7 +25,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
@@ -304,6 +304,7 @@ public class HandleHttpRequest extends AbstractProcessor {
}
private volatile Server server;
+ private volatile boolean ready;
private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue;
private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
@@ -323,7 +324,7 @@ public class HandleHttpRequest extends AbstractProcessor {
initialized.set(false);
}
- private synchronized void initializeServer(final ProcessContext context) throws Exception {
+ synchronized void initializeServer(final ProcessContext context) throws Exception {
if(initialized.get()){
return;
}
@@ -461,6 +462,12 @@ public class HandleHttpRequest extends AbstractProcessor {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
return;
+ } else if (!ready) {
+ getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE",
+ new Object[]{request.getRemoteAddr()});
+
+ response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+ return;
}
// Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
@@ -491,6 +498,7 @@ public class HandleHttpRequest extends AbstractProcessor {
getLogger().info("Server started and listening on port " + getPort());
initialized.set(true);
+ ready = true;
}
protected int getPort() {
@@ -535,10 +543,13 @@ public class HandleHttpRequest extends AbstractProcessor {
return sslFactory;
}
- @OnStopped
+ @OnUnscheduled
public void shutdown() throws Exception {
+ ready = false;
+
if (server != null) {
getLogger().debug("Shutting down server");
+ rejectPendingRequests();
server.stop();
server.destroy();
server.join();
@@ -547,6 +558,35 @@ public class HandleHttpRequest extends AbstractProcessor {
}
}
+ void rejectPendingRequests() {
+ HttpRequestContainer container;
+ while ((container = getNextContainer()) != null) {
+ try {
+ getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
+ new Object[]{container.getRequest().getRemoteAddr()});
+
+ HttpServletResponse response = container.getResponse();
+ response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+ container.getContext().complete();
+ } catch (final IOException e) {
+ getLogger().warn("Failed to send HTTP response to {} due to {}",
+ new Object[]{container.getRequest().getRemoteAddr(), e});
+ }
+ }
+ }
+
+ private HttpRequestContainer getNextContainer() {
+ HttpRequestContainer container;
+ try {
+ container = containerQueue.poll(2, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup.");
+ container = null;
+ }
+
+ return container;
+ }
+
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
index f65c0b9..391d22d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
@@ -20,6 +20,8 @@ import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
+import okhttp3.Call;
+import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
@@ -28,17 +30,17 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -49,22 +51,31 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class ITestHandleHttpRequest {
+ private HandleHttpRequest processor;
+
private static Map<String, String> getTruststoreProperties() {
final Map<String, String> props = new HashMap<>();
props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
@@ -103,9 +114,21 @@ public class ITestHandleHttpRequest {
return service.createSSLContext(clientAuth);
}
+ @After
+ public void tearDown() throws Exception {
+ if (processor != null) {
+ processor.shutdown();
+ }
+ }
+
@Test(timeout=30000)
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(1);
+
+ processor = createProcessor(serverReady, requestSent);
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -113,73 +136,65 @@ public class ITestHandleHttpRequest {
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(3000);
- connection.setReadTimeout(3000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ serverReady.await();
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(30000);
+ connection.setReadTimeout(30000);
+
+ sendRequest(connection, requestSent);
+ } catch (final Throwable t) {
+ // Do nothing as HandleHttpRequest doesn't respond normally
}
- });
- httpThread.start();
-
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
}
+ });
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
- assertEquals(1, contextMap.size());
-
- final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
- mff.assertAttributeEquals("http.query.param.query", "true");
- mff.assertAttributeEquals("http.query.param.value1", "value1");
- mff.assertAttributeEquals("http.query.param.value2", "");
- mff.assertAttributeEquals("http.query.param.value3", "");
- mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
- mff.assertAttributeEquals("http.headers.header1", "value1");
- mff.assertAttributeEquals("http.headers.header3", "apple=orange");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
+ httpThread.start();
+ runner.run(1, false);
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+ assertEquals(1, contextMap.size());
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("http.query.param.query", "true");
+ mff.assertAttributeEquals("http.query.param.value1", "value1");
+ mff.assertAttributeEquals("http.query.param.value2", "");
+ mff.assertAttributeEquals("http.query.param.value3", "");
+ mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+ mff.assertAttributeEquals("http.headers.header1", "value1");
+ mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+ }
@Test(timeout=30000)
public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(1);
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+ processor = createProcessor(serverReady, requestSent);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
+ serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
@@ -202,21 +217,15 @@ public class ITestHandleHttpRequest {
.writeTimeout(3000, TimeUnit.MILLISECONDS)
.build();
- try (Response response = client.newCall(request).execute()) {
- Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
- }
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
+ sendRequest(client, request, requestSent);
+ } catch (Exception e) {
+ // Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
- httpThread.start();
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
- }
+ httpThread.start();
+ runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
assertEquals(1, contextMap.size());
@@ -275,31 +284,30 @@ public class ITestHandleHttpRequest {
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
}
@Test(timeout=30000)
public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- contextMap.setRegisterSuccessfully(false);
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(1);
+ CountDownLatch resultReady = new CountDownLatch(1);
+
+ processor = createProcessor(serverReady, requestSent);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ contextMap.setRegisterSuccessfully(false);
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
AtomicInteger responseCode = new AtomicInteger(0);
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
+ serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
@@ -322,29 +330,32 @@ public class ITestHandleHttpRequest {
.writeTimeout(20000, TimeUnit.MILLISECONDS)
.build();
- try (Response response = client.newCall(request).execute()) {
- responseCode.set(response.code());
- }
+ Callback callback = new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ // Not going to happen
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) throws IOException {
+ responseCode.set(response.code());
+ resultReady.countDown();
+ }
+ };
+ sendRequest(client, request, callback, requestSent);
} catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
+ // Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
- httpThread.start();
- while (responseCode.get() == 0) {
- // process the request.
- runner.run(1, false, false);
- }
+ httpThread.start();
+ runner.run(1, false, false);
+ resultReady.await();
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
assertEquals(0, contextMap.size());
Assert.assertEquals(503, responseCode.get());
- } finally {
- // shut down the server
- runner.run(1, true);
- }
}
private byte[] generateRandomBinaryData(int i) {
@@ -373,7 +384,12 @@ public class ITestHandleHttpRequest {
@Test(timeout=30000)
public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(1);
+ CountDownLatch resultReady = new CountDownLatch(1);
+
+ processor = createProcessor(serverReady, requestSent);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -382,70 +398,160 @@ public class ITestHandleHttpRequest {
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
contextMap.setRegisterSuccessfully(false);
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final int[] responseCode = new int[1];
- responseCode[0] = 0;
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- HttpURLConnection connection = null;
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- connection = (HttpURLConnection) new URL("http://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(20000);
- connection.setReadTimeout(20000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- if(connection != null ) {
- try {
- responseCode[0] = connection.getResponseCode();
- } catch (IOException e) {
- responseCode[0] = -1;
- }
- } else {
- responseCode[0] = -2;
+ final int[] responseCode = new int[1];
+ responseCode[0] = 0;
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ HttpURLConnection connection = null;
+ try {
+ serverReady.await();
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ connection = (HttpURLConnection) new URL("http://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(20000);
+ connection.setReadTimeout(20000);
+
+ sendRequest(connection, requestSent);
+ } catch (final Throwable t) {
+ if(connection != null ) {
+ try {
+ responseCode[0] = connection.getResponseCode();
+ } catch (IOException e) {
+ responseCode[0] = -1;
}
+ } else {
+ responseCode[0] = -2;
}
+ } finally {
+ resultReady.countDown();
+ }
+ }
+ });
+
+ httpThread.start();
+ runner.run(1, false, false);
+ resultReady.await();
+
+ runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
+ assertEquals(503, responseCode[0]);
+ }
+
+ @Test
+ public void testCleanup() throws Exception {
+ // GIVEN
+ int nrOfRequests = 5;
+
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(nrOfRequests);
+ CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests-1);
+
+ processor = new HandleHttpRequest() {
+ @Override
+ synchronized void initializeServer(ProcessContext context) throws Exception {
+ super.initializeServer(context);
+ serverReady.countDown();
+
+ requestSent.await();
+ while (getRequestQueueSize() < nrOfRequests) {
+ Thread.sleep(200);
}
- });
- httpThread.start();
+ }
+ };
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
- while (responseCode[0] == 0) {
- // process the request.
- runner.run(1, false, false);
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ List<Response> responses = new ArrayList<>(nrOfRequests);
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ serverReady.await();
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+ OkHttpClient client =
+ new OkHttpClient.Builder()
+ .readTimeout(3000, TimeUnit.MILLISECONDS)
+ .writeTimeout(3000, TimeUnit.MILLISECONDS)
+ .build();
+ client.dispatcher().setMaxRequests(nrOfRequests);
+ client.dispatcher().setMaxRequestsPerHost(nrOfRequests);
+
+ Callback callback = new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ // Will only happen once for the first non-rejected request, but not important
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) throws IOException {
+ responses.add(response);
+ cleanupDone.countDown();
+ }
+ };
+ IntStream.rangeClosed(1, nrOfRequests).forEach(
+ requestCounter -> {
+ Request request = new Request.Builder()
+ .url(String.format("http://localhost:%s/my/" + requestCounter , port))
+ .get()
+ .build();
+ sendRequest(client, request, callback, requestSent);
+ }
+ );
+ } catch (final Throwable t) {
+ // Do nothing as HandleHttpRequest doesn't respond normally
+ }
}
+ });
- runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
- assertEquals(503, responseCode[0]);
+ // WHEN
+ httpThread.start();
+ runner.run(1, false);
+ cleanupDone.await();
+
+ // THEN
+ int nrOfPendingRequests = processor.getRequestQueueSize();
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
- } finally {
- // shut down the server
- runner.run(1, true);
+ assertEquals(1, contextMap.size());
+ assertEquals(0, nrOfPendingRequests);
+ assertEquals(responses.size(), nrOfRequests-1);
+ for (Response response : responses) {
+ assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
+ assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
}
}
@Test
- public void testSecure() throws InitializationException {
+ public void testSecure() throws Exception {
secureTest(false);
}
@Test
- public void testSecureTwoWaySsl() throws InitializationException {
+ public void testSecureTwoWaySsl() throws Exception {
secureTest(true);
}
- private void secureTest(boolean twoWaySsl) throws InitializationException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ private void secureTest(boolean twoWaySsl) throws Exception {
+ CountDownLatch serverReady = new CountDownLatch(1);
+ CountDownLatch requestSent = new CountDownLatch(1);
+
+ processor = createProcessor(serverReady, requestSent);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@@ -458,76 +564,117 @@ public class ITestHandleHttpRequest {
sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2");
useSSLContextService(runner, sslProperties, twoWaySsl ? SSLContextService.ClientAuth.WANT : SSLContextService.ClientAuth.NONE);
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-
- if (twoWaySsl) {
- // use a client certificate, do not reuse the server's keystore
- SSLContext clientSslContext = SslContextFactory.createSslContext(
- getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
- getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
- "JKS",
- getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
- getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
- "JKS",
- null,
- "TLSv1.2");
- connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
- } else {
- // with one-way SSL, the client still needs a truststore
- SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
- getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
- getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
- "JKS",
- "TLSv1.2");
- connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
- }
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(3000);
- connection.setReadTimeout(3000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ serverReady.await();
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+
+ if (twoWaySsl) {
+ // use a client certificate, do not reuse the server's keystore
+ SSLContext clientSslContext = SslContextFactory.createSslContext(
+ getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
+ getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
+ "JKS",
+ getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
+ getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
+ "JKS",
+ null,
+ "TLSv1.2");
+ connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
+ } else {
+ // with one-way SSL, the client still needs a truststore
+ SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
+ getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
+ getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
+ "JKS",
+ "TLSv1.2");
+ connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
}
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(3000);
+ connection.setReadTimeout(3000);
+
+ sendRequest(connection, requestSent);
+ } catch (final Throwable t) {
+ // Do nothing as HandleHttpRequest doesn't respond normally
}
- });
- httpThread.start();
+ }
+ });
+
+ httpThread.start();
+ runner.run(1, false, false);
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+ assertEquals(1, contextMap.size());
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("http.query.param.query", "true");
+ mff.assertAttributeEquals("http.query.param.value1", "value1");
+ mff.assertAttributeEquals("http.query.param.value2", "");
+ mff.assertAttributeEquals("http.query.param.value3", "");
+ mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+ mff.assertAttributeEquals("http.headers.header1", "value1");
+ mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+ mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
+ }
+
+ private HandleHttpRequest createProcessor(CountDownLatch serverReady, CountDownLatch requestSent) {
+ return new HandleHttpRequest() {
+ @Override
+ synchronized void initializeServer(ProcessContext context) throws Exception {
+ super.initializeServer(context);
+ serverReady.countDown();
+
+ requestSent.await();
+ while (getRequestQueueSize() == 0) {
+ Thread.sleep(200);
+ }
}
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
- assertEquals(1, contextMap.size());
-
- final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
- mff.assertAttributeEquals("http.query.param.query", "true");
- mff.assertAttributeEquals("http.query.param.value1", "value1");
- mff.assertAttributeEquals("http.query.param.value2", "");
- mff.assertAttributeEquals("http.query.param.value3", "");
- mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
- mff.assertAttributeEquals("http.headers.header1", "value1");
- mff.assertAttributeEquals("http.headers.header3", "apple=orange");
- mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
+ @Override
+ void rejectPendingRequests() {
+ // Skip this, otherwise it would wait to make sure there are no more requests
+ }
+ };
+ }
+
+ private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
+ Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
+ .submit(() -> connection.getInputStream());
+
+ requestSent.countDown();
+
+ executionFuture.get();
+ }
+
+ private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
+ Callback callback = new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ // We (may) get a timeout as the processor doesn't answer unless there is some kind of error
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) throws IOException {
+ // Not called as the processor doesn't answer unless there is some kind of error
+ }
+ };
+
+ sendRequest(client, request, callback, requestSent);
+ }
+
+ private void sendRequest(OkHttpClient client, Request request, Callback callback, CountDownLatch requestSent) {
+ client.newCall(request).enqueue(callback);
+ requestSent.countDown();
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {