You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/11/21 18:11:35 UTC
nifi git commit: NIFI-5836 This closes #3181. changed the brittle
timing based tests to integration tests
Repository: nifi
Updated Branches:
refs/heads/master 71499f774 -> 2a4745c07
NIFI-5836 This closes #3181. changed the brittle timing based tests to integration tests
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2a4745c0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2a4745c0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2a4745c0
Branch: refs/heads/master
Commit: 2a4745c07770b1c28ab0a4d0fea1be34f74f08cb
Parents: 71499f7
Author: joewitt <jo...@apache.org>
Authored: Wed Nov 21 12:26:13 2018 -0500
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 21 13:11:06 2018 -0500
----------------------------------------------------------------------
.../standard/ITestHandleHttpRequest.java | 536 +++++++++++++++++++
.../standard/TestHandleHttpRequest.java | 536 -------------------
2 files changed, 536 insertions(+), 536 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a4745c0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
new file mode 100644
index 0000000..1645e55
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.google.api.client.util.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITestHandleHttpRequest {
+
+ private static Map<String, String> getTruststoreProperties() {
+ final Map<String, String> props = new HashMap<>();
+ props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
+ props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "passwordpassword");
+ props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+ return props;
+ }
+
+ private static Map<String, String> getKeystoreProperties() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/keystore.jks");
+ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "passwordpassword");
+ properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+ return properties;
+ }
+
+ private static SSLContext useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) {
+ final SSLContextService service = new StandardRestrictedSSLContextService();
+ try {
+ controller.addControllerService("ssl-service", service, sslProperties);
+ controller.enableControllerService(service);
+ } catch (InitializationException ex) {
+ ex.printStackTrace();
+ Assert.fail("Could not create SSL Context Service");
+ }
+
+ controller.setProperty(HandleHttpRequest.SSL_CONTEXT, "ssl-service");
+ return service.createSSLContext(SSLContextService.ClientAuth.WANT);
+ }
+
+ @Test(timeout=30000)
+ public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(3000);
+ connection.setReadTimeout(3000);
+
+ StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+ assertEquals(1, contextMap.size());
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("http.query.param.query", "true");
+ mff.assertAttributeEquals("http.query.param.value1", "value1");
+ mff.assertAttributeEquals("http.query.param.value2", "");
+ mff.assertAttributeEquals("http.query.param.value3", "");
+ mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+ mff.assertAttributeEquals("http.headers.header1", "value1");
+ mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+
+ @Test(timeout=30000)
+ public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+ MultipartBody multipartBody = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ .build();
+
+ Request request = new Request.Builder()
+ .url(String.format("http://localhost:%s/my/path", port))
+ .post(multipartBody).build();
+
+ OkHttpClient client =
+ new OkHttpClient.Builder()
+ .readTimeout(3000, TimeUnit.MILLISECONDS)
+ .writeTimeout(3000, TimeUnit.MILLISECONDS)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
+ }
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
+ assertEquals(1, contextMap.size());
+
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS);
+
+ // Part fragments are not processed in the order we submitted them.
+ // We cannot rely on the order we sent them in.
+ MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+ String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+ mff.assertAttributeEquals("http.multipart.name", "p1");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+ // each part generates a corresponding flow file - yet all parts are coming from the same request,
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "p2");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file1");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file2");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file3");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ contextMap.setRegisterSuccessfully(false);
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ AtomicInteger responseCode = new AtomicInteger(0);
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+ MultipartBody multipartBody = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ .build();
+
+ Request request = new Request.Builder()
+ .url(String.format("http://localhost:%s/my/path", port))
+ .post(multipartBody).build();
+
+ OkHttpClient client =
+ new OkHttpClient.Builder()
+ .readTimeout(20000, TimeUnit.MILLISECONDS)
+ .writeTimeout(20000, TimeUnit.MILLISECONDS)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ responseCode.set(response.code());
+ }
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while (responseCode.get() == 0) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
+ assertEquals(0, contextMap.size());
+ Assert.assertEquals(503, responseCode.get());
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ private byte[] generateRandomBinaryData(int i) {
+ byte[] bytes = new byte[100];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+
+ private File createTextFile(String fileName, String... lines) throws IOException {
+ File file = new File(fileName);
+ file.deleteOnExit();
+ for (String string : lines) {
+ Files.append(string, file, Charsets.UTF_8);
+ }
+ return file;
+ }
+
+
+ protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
+ Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+ Assert.assertTrue(optional.isPresent());
+ return optional.get();
+ }
+
+
+ @Test(timeout=30000)
+ public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+ contextMap.setRegisterSuccessfully(false);
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ final int[] responseCode = new int[1];
+ responseCode[0] = 0;
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ HttpURLConnection connection = null;
+ try {
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ connection = (HttpURLConnection) new URL("http://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(20000);
+ connection.setReadTimeout(20000);
+
+ StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ if(connection != null ) {
+ try {
+ responseCode[0] = connection.getResponseCode();
+ } catch (IOException e) {
+ responseCode[0] = -1;
+ }
+ } else {
+ responseCode[0] = -2;
+ }
+ }
+ }
+ });
+ httpThread.start();
+
+ while (responseCode[0] == 0) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
+ assertEquals(503, responseCode[0]);
+
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ @Test
+ public void testSecure() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ final Map<String, String> sslProperties = getKeystoreProperties();
+ sslProperties.putAll(getTruststoreProperties());
+ sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2");
+ final SSLContext sslContext = useSSLContextService(runner, sslProperties);
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+ final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
+ + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+
+ connection.setSSLSocketFactory(sslContext.getSocketFactory());
+ connection.setDoOutput(false);
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("header1", "value1");
+ connection.setRequestProperty("header2", "");
+ connection.setRequestProperty("header3", "apple=orange");
+ connection.setConnectTimeout(3000);
+ connection.setReadTimeout(3000);
+
+ StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+ assertEquals(1, contextMap.size());
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("http.query.param.query", "true");
+ mff.assertAttributeEquals("http.query.param.value1", "value1");
+ mff.assertAttributeEquals("http.query.param.value2", "");
+ mff.assertAttributeEquals("http.query.param.value3", "");
+ mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+ mff.assertAttributeEquals("http.headers.header1", "value1");
+ mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+ mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
+
+ private boolean registerSuccessfully = true;
+
+ private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>();
+
+ @Override
+ public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
+ if(registerSuccessfully) {
+ responseMap.put(identifier, response);
+ }
+ return registerSuccessfully;
+ }
+
+ @Override
+ public HttpServletResponse getResponse(final String identifier) {
+ return responseMap.get(identifier);
+ }
+
+ @Override
+ public void complete(final String identifier) {
+ responseMap.remove(identifier);
+ }
+
+ public int size() {
+ return responseMap.size();
+ }
+
+ public boolean isRegisterSuccessfully() {
+ return registerSuccessfully;
+ }
+
+ public void setRegisterSuccessfully(boolean registerSuccessfully) {
+ this.registerSuccessfully = registerSuccessfully;
+ }
+
+ @Override
+ public long getRequestTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(30000, TimeUnit.MILLISECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a4745c0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
deleted file mode 100644
index 3c64bff..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import com.google.api.client.util.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-import okhttp3.MediaType;
-import okhttp3.MultipartBody;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.http.HttpContextMap;
-import org.apache.nifi.processors.standard.util.HTTPUtils;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestHandleHttpRequest {
-
- private static Map<String, String> getTruststoreProperties() {
- final Map<String, String> props = new HashMap<>();
- props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
- props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "passwordpassword");
- props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
- return props;
- }
-
- private static Map<String, String> getKeystoreProperties() {
- final Map<String, String> properties = new HashMap<>();
- properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/keystore.jks");
- properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "passwordpassword");
- properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
- return properties;
- }
-
- private static SSLContext useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) {
- final SSLContextService service = new StandardRestrictedSSLContextService();
- try {
- controller.addControllerService("ssl-service", service, sslProperties);
- controller.enableControllerService(service);
- } catch (InitializationException ex) {
- ex.printStackTrace();
- Assert.fail("Could not create SSL Context Service");
- }
-
- controller.setProperty(HandleHttpRequest.SSL_CONTEXT, "ssl-service");
- return service.createSSLContext(SSLContextService.ClientAuth.WANT);
- }
-
- @Test(timeout=30000)
- public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(3000);
- connection.setReadTimeout(3000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
- }
- });
- httpThread.start();
-
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
- }
-
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
- assertEquals(1, contextMap.size());
-
- final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
- mff.assertAttributeEquals("http.query.param.query", "true");
- mff.assertAttributeEquals("http.query.param.value1", "value1");
- mff.assertAttributeEquals("http.query.param.value2", "");
- mff.assertAttributeEquals("http.query.param.value3", "");
- mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
- mff.assertAttributeEquals("http.headers.header1", "value1");
- mff.assertAttributeEquals("http.headers.header3", "apple=orange");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
-
-
- @Test(timeout=30000)
- public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
-
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
-
- MultipartBody multipartBody = new MultipartBody.Builder()
- .setType(MultipartBody.FORM)
- .addFormDataPart("p1", "v1")
- .addFormDataPart("p2", "v2")
- .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
- .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
- .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
- .build();
-
- Request request = new Request.Builder()
- .url(String.format("http://localhost:%s/my/path", port))
- .post(multipartBody).build();
-
- OkHttpClient client =
- new OkHttpClient.Builder()
- .readTimeout(3000, TimeUnit.MILLISECONDS)
- .writeTimeout(3000, TimeUnit.MILLISECONDS)
- .build();
-
- try (Response response = client.newCall(request).execute()) {
- Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
- }
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
- }
- });
- httpThread.start();
-
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
- }
-
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
- assertEquals(1, contextMap.size());
-
- List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS);
-
- // Part fragments are not processed in the order we submitted them.
- // We cannot rely on the order we sent them in.
- MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
- String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
- mff.assertAttributeEquals("http.multipart.name", "p1");
- mff.assertAttributeExists("http.multipart.size");
- mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
- mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
- mff.assertAttributeExists("http.headers.multipart.content-disposition");
-
-
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
- // each part generates a corresponding flow file - yet all parts are coming from the same request,
- mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
- mff.assertAttributeEquals("http.multipart.name", "p2");
- mff.assertAttributeExists("http.multipart.size");
- mff.assertAttributeExists("http.multipart.fragments.sequence.number");
- mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
- mff.assertAttributeExists("http.headers.multipart.content-disposition");
-
-
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
- mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
- mff.assertAttributeEquals("http.multipart.name", "file1");
- mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
- mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
- mff.assertAttributeExists("http.multipart.size");
- mff.assertAttributeExists("http.multipart.fragments.sequence.number");
- mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
- mff.assertAttributeExists("http.headers.multipart.content-disposition");
-
-
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
- mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
- mff.assertAttributeEquals("http.multipart.name", "file2");
- mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
- mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
- mff.assertAttributeExists("http.multipart.size");
- mff.assertAttributeExists("http.multipart.fragments.sequence.number");
- mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
- mff.assertAttributeExists("http.headers.multipart.content-disposition");
-
-
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
- mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
- mff.assertAttributeEquals("http.multipart.name", "file3");
- mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
- mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
- mff.assertAttributeExists("http.multipart.size");
- mff.assertAttributeExists("http.multipart.fragments.sequence.number");
- mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
- mff.assertAttributeExists("http.headers.multipart.content-disposition");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
-
- @Test(timeout=30000)
- public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- contextMap.setRegisterSuccessfully(false);
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- AtomicInteger responseCode = new AtomicInteger(0);
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
-
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
-
- MultipartBody multipartBody = new MultipartBody.Builder()
- .setType(MultipartBody.FORM)
- .addFormDataPart("p1", "v1")
- .addFormDataPart("p2", "v2")
- .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
- .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
- .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
- .build();
-
- Request request = new Request.Builder()
- .url(String.format("http://localhost:%s/my/path", port))
- .post(multipartBody).build();
-
- OkHttpClient client =
- new OkHttpClient.Builder()
- .readTimeout(20000, TimeUnit.MILLISECONDS)
- .writeTimeout(20000, TimeUnit.MILLISECONDS)
- .build();
-
- try (Response response = client.newCall(request).execute()) {
- responseCode.set(response.code());
- }
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
- }
- });
- httpThread.start();
-
- while (responseCode.get() == 0) {
- // process the request.
- runner.run(1, false, false);
- }
-
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
- assertEquals(0, contextMap.size());
- Assert.assertEquals(503, responseCode.get());
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
-
- private byte[] generateRandomBinaryData(int i) {
- byte[] bytes = new byte[100];
- new Random().nextBytes(bytes);
- return bytes;
- }
-
-
- private File createTextFile(String fileName, String... lines) throws IOException {
- File file = new File(fileName);
- file.deleteOnExit();
- for (String string : lines) {
- Files.append(string, file, Charsets.UTF_8);
- }
- return file;
- }
-
-
- protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
- Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
- Assert.assertTrue(optional.isPresent());
- return optional.get();
- }
-
-
- @Test(timeout=30000)
- public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
- contextMap.setRegisterSuccessfully(false);
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final int[] responseCode = new int[1];
- responseCode[0] = 0;
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- HttpURLConnection connection = null;
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- connection = (HttpURLConnection) new URL("http://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(20000);
- connection.setReadTimeout(20000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- if(connection != null ) {
- try {
- responseCode[0] = connection.getResponseCode();
- } catch (IOException e) {
- responseCode[0] = -1;
- }
- } else {
- responseCode[0] = -2;
- }
- }
- }
- });
- httpThread.start();
-
- while (responseCode[0] == 0) {
- // process the request.
- runner.run(1, false, false);
- }
-
- runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
- assertEquals(503, responseCode[0]);
-
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
-
- @Test
- public void testSecure() throws InitializationException {
- final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
- runner.setProperty(HandleHttpRequest.PORT, "0");
-
- final MockHttpContextMap contextMap = new MockHttpContextMap();
- runner.addControllerService("http-context-map", contextMap);
- runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
-
- final Map<String, String> sslProperties = getKeystoreProperties();
- sslProperties.putAll(getTruststoreProperties());
- sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2");
- final SSLContext sslContext = useSSLContextService(runner, sslProperties);
-
- // trigger processor to stop but not shutdown.
- runner.run(1, false);
- try {
- final Thread httpThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
- final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
- + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-
- connection.setSSLSocketFactory(sslContext.getSocketFactory());
- connection.setDoOutput(false);
- connection.setRequestMethod("GET");
- connection.setRequestProperty("header1", "value1");
- connection.setRequestProperty("header2", "");
- connection.setRequestProperty("header3", "apple=orange");
- connection.setConnectTimeout(3000);
- connection.setReadTimeout(3000);
-
- StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
- }
- });
- httpThread.start();
-
- while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
- // process the request.
- runner.run(1, false, false);
- }
-
- runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
- assertEquals(1, contextMap.size());
-
- final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
- mff.assertAttributeEquals("http.query.param.query", "true");
- mff.assertAttributeEquals("http.query.param.value1", "value1");
- mff.assertAttributeEquals("http.query.param.value2", "");
- mff.assertAttributeEquals("http.query.param.value3", "");
- mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
- mff.assertAttributeEquals("http.headers.header1", "value1");
- mff.assertAttributeEquals("http.headers.header3", "apple=orange");
- mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
- } finally {
- // shut down the server
- runner.run(1, true);
- }
- }
-
- private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
-
- private boolean registerSuccessfully = true;
-
- private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>();
-
- @Override
- public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
- if(registerSuccessfully) {
- responseMap.put(identifier, response);
- }
- return registerSuccessfully;
- }
-
- @Override
- public HttpServletResponse getResponse(final String identifier) {
- return responseMap.get(identifier);
- }
-
- @Override
- public void complete(final String identifier) {
- responseMap.remove(identifier);
- }
-
- public int size() {
- return responseMap.size();
- }
-
- public boolean isRegisterSuccessfully() {
- return registerSuccessfully;
- }
-
- public void setRegisterSuccessfully(boolean registerSuccessfully) {
- this.registerSuccessfully = registerSuccessfully;
- }
-
- @Override
- public long getRequestTimeout(TimeUnit timeUnit) {
- return timeUnit.convert(30000, TimeUnit.MILLISECONDS);
- }
- }
-}