You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:43 UTC
[19/48] incubator-nifi git commit: NIFI-6: Rebase from develop to
include renaming of directory structure
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index 0000000,4fae6ed..4a74416
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@@ -1,0 -1,206 +1,208 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
-import org.apache.nifi.processors.standard.DetectDuplicate;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+
++import org.apache.commons.lang3.SerializationException;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.controller.AbstractControllerService;
+ import org.apache.nifi.distributed.cache.client.Deserializer;
+ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+ import org.apache.nifi.distributed.cache.client.Serializer;
+ import org.apache.nifi.reporting.InitializationException;
+ import org.apache.nifi.util.MockControllerServiceInitializationContext;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
-
-import org.apache.commons.lang3.SerializationException;
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class TestDetectDuplicate {
+
+ private static Logger LOGGER;
+
+ static {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug");
+ LOGGER = LoggerFactory.getLogger(TestListenUDP.class);
+ }
+
+ @Test
+ public void testDuplicate() throws InitializationException {
-
+ TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+ final DistributedMapCacheClientImpl client = createClient();
+ final Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+ runner.addControllerService("client", client, clientProperties);
+ runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
+ runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
+ runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours");
+ Map<String, String> props = new HashMap<>();
+ props.put("hash.value", "1000");
+ runner.enqueue(new byte[]{}, props);
++ runner.enableControllerService(client);
++
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+ runner.clearTransferState();
+ client.exists = true;
+ runner.enqueue(new byte[]{}, props);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1);
+ runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0);
+ runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException {
+
+ TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+ final DistributedMapCacheClientImpl client = createClient();
+ final Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+ runner.addControllerService("client", client, clientProperties);
+ runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
+ runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
+ runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs");
++ runner.enableControllerService(client);
++
+ Map<String, String> props = new HashMap<>();
+ props.put("hash.value", "1000");
+ runner.enqueue(new byte[]{}, props);
++
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+ runner.clearTransferState();
+ client.exists = true;
+ Thread.sleep(3000);
+ runner.enqueue(new byte[]{}, props);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+ runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0);
+ runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
+ }
+
+ private DistributedMapCacheClientImpl createClient() throws InitializationException {
+
+ final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
+ MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ return client;
+ }
+
+ static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
+
+ boolean exists = false;
+ private Object cacheValue;
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ }
+
+ @Override
+ protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(DistributedMapCacheClientService.HOSTNAME);
+ props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
+ props.add(DistributedMapCacheClientService.PORT);
+ props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
+ return props;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ if (exists) {
+ return false;
+ }
+
+ cacheValue = value;
+ return true;
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+ Deserializer<V> valueDeserializer) throws IOException {
+ if (exists) {
+ return (V) cacheValue;
+ }
+ cacheValue = value;
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ return exists;
+ }
+
+ @Override
+ public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ exists = false;
+ return true;
+ }
+ }
+
+ private static class StringSerializer implements Serializer<String> {
+
+ @Override
+ public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+ output.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private static void deleteRecursively(final File dataFile) throws IOException {
+ if (dataFile == null || !dataFile.exists()) {
+ return;
+ }
+
+ final File[] children = dataFile.listFiles();
+ for (final File child : children) {
+ if (child.isDirectory()) {
+ deleteRecursively(child);
+ } else {
+ for (int i = 0; i < 100 && child.exists(); i++) {
+ child.delete();
+ }
+
+ if (child.exists()) {
+ throw new IOException("Could not delete " + dataFile.getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
index 0000000,63bdcf8..f52b212
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
@@@ -1,0 -1,354 +1,340 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.Properties;
+
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.reporting.InitializationException;
+ import org.apache.nifi.ssl.SSLContextService;
+ import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
+ import org.apache.nifi.util.MockFlowFile;
+ import org.apache.nifi.util.MockProcessContext;
+ import org.apache.nifi.util.MockProcessorInitializationContext;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
+ import org.eclipse.jetty.servlet.ServletHandler;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+ /**
+ * @author unattributed
+ *
+ */
+ public class TestGetHTTP {
+
- private static Logger LOGGER;
+ private TestRunner controller;
+
+ @BeforeClass
+ public static void before() {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.GetHTTP", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestGetHTTP", "debug");
- LOGGER = LoggerFactory.getLogger(TestGetHTTP.class);
+ File confDir = new File("conf");
+ if (!confDir.exists()) {
+ confDir.mkdir();
+ }
+ }
+
+ @AfterClass
+ public static void after() {
+ File confDir = new File("conf");
+ assertTrue(confDir.exists());
+ File[] files = confDir.listFiles();
+ assertTrue(files.length > 0);
+ for (File file : files) {
+ assertTrue("Failed to delete " + file.getName(), file.delete());
+ }
+ assertTrue(confDir.delete());
+ }
+
- private static Map<String, String> createSslProperties() {
- Map<String, String> map = new HashMap<String, String>();
- map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
- map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
- map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
- map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
- map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
- map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
- return map;
- }
+
+ @Test
+ public final void testContentModified() throws Exception {
+ // set up web service
+ ServletHandler handler = new ServletHandler();
+ handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+ // create the service
+ TestServer server = new TestServer();
+ server.addHandler(handler);
+
+ try {
+ server.startServer();
+
+ // this is the base url with the random port
+ String destination = server.getUrl();
+
+ // set up NiFi mock controller
+ controller = TestRunners.newTestRunner(GetHTTP.class);
+ controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+ controller.setProperty(GetHTTP.URL, destination);
+ controller.setProperty(GetHTTP.FILENAME, "testFile");
+ controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+ GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
+ assertEquals("", getHTTPProcessor.entityTagRef.get());
+ assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
+ controller.run(2);
+
+ // verify the lastModified and entityTag are updated
+ assertFalse("".equals(getHTTPProcessor.entityTagRef.get()));
+ assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(getHTTPProcessor.lastModifiedRef.get()));
+ // ran twice, but got one...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+ // verify remote.source flowfile attribute
+ controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost");
+
+ controller.clearTransferState();
+
+ // turn off checking for etag and lastModified
+ RESTServiceContentModified.IGNORE_ETAG = true;
+ RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
+ controller.run(2);
+ // ran twice, got two...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2);
+ controller.clearTransferState();
+
+ // turn on checking for etag
+ RESTServiceContentModified.IGNORE_ETAG = false;
+ controller.run(2);
+ // ran twice, got 0...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
+
+ // turn on checking for lastModified, but off for etag
+ RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
+ RESTServiceContentModified.IGNORE_ETAG = true;
+ controller.run(2);
+ // ran twice, got 0...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
+
+ // turn off checking for lastModified, turn on checking for etag, but change the value
+ RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
+ RESTServiceContentModified.IGNORE_ETAG = false;
+ RESTServiceContentModified.ETAG = 1;
+ controller.run(2);
+ // ran twice, got 1...but should have new cached etag
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+ assertEquals("1", getHTTPProcessor.entityTagRef.get());
+ controller.clearTransferState();
+
+ // turn off checking for Etag, turn on checking for lastModified, but change value
+ RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
+ RESTServiceContentModified.IGNORE_ETAG = true;
+ RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000;
+ String lastMod = getHTTPProcessor.lastModifiedRef.get();
+ controller.run(2);
+ // ran twice, got 1...but should have new cached etag
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+ assertFalse(lastMod.equals(getHTTPProcessor.lastModifiedRef.get()));
+ controller.clearTransferState();
+
+ // shutdown web service
+ } finally {
+ server.shutdownServer();
+ }
+ }
+
+ @Test
+ public void testPersistEtagLastMod() throws Exception {
+ // delete the config file
+ File confDir = new File("conf");
+ File[] files = confDir.listFiles();
+ for (File file : files) {
+ assertTrue("Failed to delete " + file.getName(), file.delete());
+ }
+
+ // set up web service
+ ServletHandler handler = new ServletHandler();
+ handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+ // create the service
+ TestServer server = new TestServer();
+ server.addHandler(handler);
+
+ try {
+ server.startServer();
+
+ // get the server url
+ String destination = server.getUrl();
+
+ // set up NiFi mock controller
+ controller = TestRunners.newTestRunner(GetHTTP.class);
+ controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+ controller.setProperty(GetHTTP.FILENAME, "testFile");
+ controller.setProperty(GetHTTP.URL, destination);
+ controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+ GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
+
+ assertEquals("", getHTTPProcessor.entityTagRef.get());
+ assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
+ controller.run(2);
+
+ // verify the lastModified and entityTag are updated
+ String etag = getHTTPProcessor.entityTagRef.get();
+ assertFalse("".equals(etag));
+ String lastMod = getHTTPProcessor.lastModifiedRef.get();
+ assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(lastMod));
+ // ran twice, but got one...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+ controller.clearTransferState();
+
+ files = confDir.listFiles();
+ assertEquals(1, files.length);
+ File file = files[0];
+ assertTrue(file.exists());
+ Properties props = new Properties();
+ FileInputStream fis = new FileInputStream(file);
+ props.load(fis);
+ fis.close();
+ assertEquals(etag, props.getProperty(GetHTTP.ETAG));
+ assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED));
+
+ ProcessorInitializationContext pic = new MockProcessorInitializationContext(controller.getProcessor(),
+ (MockProcessContext) controller.getProcessContext());
+ // init causes read from file
+ getHTTPProcessor.init(pic);
+ assertEquals(etag, getHTTPProcessor.entityTagRef.get());
+ assertEquals(lastMod, getHTTPProcessor.lastModifiedRef.get());
+ controller.run(2);
+ // ran twice, got none...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
+ controller.clearTransferState();
+ files = confDir.listFiles();
+ assertEquals(1, files.length);
+ file = files[0];
+ assertTrue(file.exists());
+ props = new Properties();
+ fis = new FileInputStream(file);
+ props.load(fis);
+ fis.close();
+ assertEquals(etag, props.getProperty(GetHTTP.ETAG));
+ assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED));
+
+ // shutdown web service
+ } finally {
+ server.shutdownServer();
+ }
+ }
+
+ @Test
+ public final void testUserAgent() throws Exception {
+ // set up web service
+ ServletHandler handler = new ServletHandler();
+ handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
+
+ // create the service
+ TestServer server = new TestServer();
+ server.addHandler(handler);
+
+ try {
+ server.startServer();
+
+ String destination = server.getUrl();
+
+ // set up NiFi mock controller
+ controller = TestRunners.newTestRunner(GetHTTP.class);
+ controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+ controller.setProperty(GetHTTP.URL, destination);
+ controller.setProperty(GetHTTP.FILENAME, "testFile");
+ controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+ controller.run();
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
+
+ controller.setProperty(GetHTTP.USER_AGENT, "testUserAgent");
+ controller.run();
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+ // shutdown web service
+ } finally {
+ server.shutdownServer();
+ }
+ }
+
+ private Map<String, String> getSslProperties() {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
+ props.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
+ props.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+ props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
+ props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
+ props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+ return props;
+ }
+
+ private void useSSLContextService() {
+ final SSLContextService service = new StandardSSLContextService();
+ try {
+ controller.addControllerService("ssl-service", service, getSslProperties());
++ controller.enableControllerService(service);
+ } catch (InitializationException ex) {
+ ex.printStackTrace();
+ Assert.fail("Could not create SSL Context Service");
+ }
+
+ controller.setProperty(GetHTTP.SSL_CONTEXT_SERVICE, "ssl-service");
+ }
+
+ @Test
+ public final void testSecure() throws Exception {
+ // set up web service
+ ServletHandler handler = new ServletHandler();
+ handler.addServletWithMapping(HelloWorldServlet.class, "/*");
+
+ // create the service
+ TestServer server = new TestServer(getSslProperties());
+ server.addHandler(handler);
+
+ try {
+ server.startServer();
+
+ String destination = server.getSecureUrl();
+
+ // set up NiFi mock controller
+ controller = TestRunners.newTestRunner(GetHTTP.class);
+ useSSLContextService();
+
+ controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+ controller.setProperty(GetHTTP.URL, destination);
+ controller.setProperty(GetHTTP.FILENAME, "testFile");
+ controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+ controller.run();
+ controller.assertAllFlowFilesTransferred(GetHTTP.REL_SUCCESS, 1);
+ final MockFlowFile mff = controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0);
+ mff.assertContentEquals("Hello, World!");
+ } finally {
+ server.shutdownServer();
+ }
+ }
+
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index 0000000,b98ba13..e5950cd
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@@ -1,0 -1,593 +1,595 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+
+ import static org.junit.Assert.*;
+
+ import java.io.IOException;
+ import java.io.PrintWriter;
+ import java.io.UnsupportedEncodingException;
+ import java.nio.charset.StandardCharsets;
+ import java.text.SimpleDateFormat;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.Locale;
+ import java.util.Map;
+
+ import javax.servlet.ServletException;
+ import javax.servlet.http.HttpServletRequest;
+ import javax.servlet.http.HttpServletResponse;
+
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processors.standard.InvokeHTTP.Config;
+ import org.apache.nifi.ssl.StandardSSLContextService;
+ import org.apache.nifi.util.MockFlowFile;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
+ import org.eclipse.jetty.server.Handler;
+ import org.eclipse.jetty.server.Request;
+ import org.eclipse.jetty.server.handler.AbstractHandler;
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ public class TestInvokeHTTP {
+
+ private static Map<String, String> sslProperties;
+ private static TestServer server;
+ private static String url;
+
+ private TestRunner runner;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // useful for verbose logging output
+ // don't commit this with this property enabled, or any 'mvn test' will be really verbose
+ // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
+
+ // create the SSL properties, which basically store keystore / trustore information
+ // this is used by the StandardSSLContextService and the Jetty Server
+ sslProperties = createSslProperties();
+
+ // create a Jetty server on a random port
+ server = createServer();
+ server.startServer();
+
+ // this is the base url with the random port
+ url = server.getSecureUrl();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ server.shutdownServer();
+ }
+
+ @Before
+ public void before() throws Exception {
+ runner = TestRunners.newTestRunner(InvokeHTTP.class);
- runner.addControllerService("ssl-context", new StandardSSLContextService(), sslProperties);
++ final StandardSSLContextService sslService = new StandardSSLContextService();
++ runner.addControllerService("ssl-context", sslService, sslProperties);
++ runner.enableControllerService(sslService);
+ runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+
+ server.clearHandlers();
+ }
+
+ @After
+ public void after() {
+ runner.shutdown();
+ }
+
+ private void addHandler(Handler handler) {
+ server.addHandler(handler);
+ }
+
+ @Test
+ public void testDateGeneration() throws Exception {
+ DateHandler dh = new DateHandler();
+ addHandler(dh);
+
+ runner.setProperty(Config.PROP_URL, url);
+ createFlowFiles(runner);
+ runner.run();
+
+ // extract the date string sent to the server
+ // and store it as a java.util.Date
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
+ Date date = sdf.parse(dh.dateString);
+
+ // calculate the difference between the date string sent by the client and
+ // the current system time -- these should be within a second or two
+ // (just enough time to run the test).
+ //
+ // If the difference is more like in hours, it's likely that a timezone
+ // conversion caused a problem.
+ long diff = Math.abs(System.currentTimeMillis() - date.getTime());
+ long threshold = 15000; // 15 seconds
+ if (diff > threshold) {
+ fail("Difference (" + diff + ") was greater than threshold (" + threshold + ")");
+ }
+ System.out.println("diff: " + diff);
+ }
+
+ @Test
+ public void test200() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/200");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ //expected in request status.code and status.message
+ //original flow file (+attributes)??????????
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ //expected in response
+ //status code, status message, all headers from server response --> ff attributes
+ //server response message body into payload of ff
+ //should not contain any original ff attributes
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
+ final String expected1 = "/status/200";
+ Assert.assertEquals(expected1, actual1);
+
+ }
+
+ @Test
+ public void test500() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/500");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 1);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ //expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "500");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Server Error");
+ bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/500");
+
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ }
+
+ @Test
+ public void test300() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/302");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 1);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+ //getMyFlowFiles();
+ //expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "302");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Found");
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ }
+
+ @Test
+ public void test304() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/304");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 1);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+ //getMyFlowFiles();
+ //expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "304");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Not Modified");
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ }
+
+ @Test
+ public void test400() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/400");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 1);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+ //getMyFlowFiles();
+ //expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "400");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Bad Request");
+ bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/400");
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ }
+
+ @Test
+ public void test412() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_URL, url + "/status/412");
+ runner.setProperty(Config.PROP_METHOD, "GET");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 1);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ //expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "412");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Precondition Failed");
+ bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/412");
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ }
+
+ @Test
+ public void testHead() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(Config.PROP_METHOD, "HEAD");
+ runner.setProperty(Config.PROP_URL, url + "/status/200");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
+ bundle1.assertContentEquals("".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain");
+ final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
+ final String expected1 = "";
+ Assert.assertEquals(expected1, actual1);
+ }
+
+ @Test
+ public void testPost() throws Exception {
+ addHandler(new PostHandler());
+
+ runner.setProperty(Config.PROP_METHOD, "POST");
+ runner.setProperty(Config.PROP_URL, url + "/post");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
+ bundle1.assertContentEquals("".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeNotExists("Content-Type");
+
+ final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
+ final String expected1 = "";
+ Assert.assertEquals(expected1, actual1);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ addHandler(new PostHandler());
+
+ runner.setProperty(Config.PROP_METHOD, "PUT");
+ runner.setProperty(Config.PROP_URL, url + "/post");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 0);
+
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
+ bundle1.assertContentEquals("".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeNotExists("Content-Type");
+
+ final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
+ final String expected1 = "";
+ Assert.assertEquals(expected1, actual1);
+ }
+
+ @Test
+ public void testConnectFailBadPort() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ // this is the bad urls
+ String badurlport = "https://localhost:" + 445;
+
+ runner.setProperty(Config.PROP_URL, badurlport + "/doesnotExist");
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 1);
+
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0);
+
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+ }
+
+ // @Test
+ // public void testGetFlowfileAttributes() throws IOException {
+ // Map<String, List<String>> input = new HashMap<>();
+ // input.put("A", Arrays.asList("1"));
+ // input.put("B", Arrays.asList("1", "2", "3"));
+ // input.put("C", new ArrayList<String>());
+ // input.put("D", null);
+ //
+ // Map<String, String> expected = new HashMap<>();
+ // expected.put(Config.STATUS_CODE, "200");
+ // expected.put(Config.STATUS_MESSAGE, "OK");
+ // expected.put(Config.STATUS_LINE, "HTTP/1.1 200 OK");
+ // expected.put("A", "1");
+ // expected.put("B", "1, 2, 3");
+ //
+ // URL url = new URL("file:/dev/null");
+ // HttpURLConnection conn = new MockHttpURLConnection(url, 200, "OK", input);
+ //
+ // Map<String, String> actual = processor.getAttributesFromHeaders(conn);
+ //
+ // assertEquals(expected, actual);
+ // }
+ // @Test
+ // public void testCsv() {
+ // // null input should return an empty string
+ // assertEquals("", processor.csv(null));
+ //
+ // // empty collection returns empty string
+ // assertEquals("", processor.csv(new ArrayList<String>()));
+ //
+ // // pretty normal checks
+ // assertEquals("1", processor.csv(Arrays.asList("1")));
+ // assertEquals("1, 2", processor.csv(Arrays.asList("1", "2")));
+ // assertEquals("1, 2, 3", processor.csv(Arrays.asList("1", "2", "3")));
+ //
+ // // values should be trimmed
+ // assertEquals("1, 2, 3", processor.csv(Arrays.asList(" 1", " 2 ", "3 ")));
+ //
+ // // empty values should be skipped
+ // assertEquals("1, 3", processor.csv(Arrays.asList("1", "", "3")));
+ //
+ // // whitespace values should be skipped
+ // assertEquals("1, 3", processor.csv(Arrays.asList("1", " ", "3")));
+ //
+ // // this (mis)behavior is currently expected, embedded comma delimiters are not escaped
+ // // note the embedded unescaped comma in the "1, " value
+ // assertEquals("1,, 2, 3", processor.csv(Arrays.asList("1, ", "2", "3")));
+ // }
+ @Test
+ public void testConnectFailBadHost() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ String badurlhost = "https://localhOOst:" + 445;
+
+ runner.setProperty(Config.PROP_URL, badurlhost + "/doesnotExist");
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
+ runner.assertTransferCount(Config.REL_RETRY, 0);
+ runner.assertTransferCount(Config.REL_NO_RETRY, 0);
+ runner.assertTransferCount(Config.REL_FAILURE, 1);
+
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0);
+
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+ }
+
+ private static Map<String, String> createSslProperties() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
+ map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
+ map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+ map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
+ map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
+ map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+ return map;
+ }
+
+ private static TestServer createServer() throws IOException {
+ return new TestServer(sslProperties);
+ }
+
+ private static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
+ attributes.put("Foo", "Bar");
+ testRunner.enqueue("Hello".getBytes("UTF-8"), attributes);
+
+ }
+
+ private static class PostHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ baseRequest.setHandled(true);
+
+ assertEquals("/post", target);
+
+ String body = request.getReader().readLine();
+ assertEquals("Hello", body);
+
+ }
+ }
+
+ private static class GetOrHeadHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+ int status = Integer.valueOf(target.substring("/status".length() + 1));
+ response.setStatus(status);
+
+ response.setContentType("text/plain");
+ response.setContentLength(target.length());
+
+ if ("GET".equalsIgnoreCase(request.getMethod())) {
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(target);
+ writer.flush();
+ }
+ }
+
+ }
+
+ }
+
+ private static class DateHandler extends AbstractHandler {
+
+ private String dateString;
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+ dateString = request.getHeader("Date");
+
+ response.setStatus(200);
+ response.setContentType("text/plain");
+ response.getWriter().println("Way to go!");
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0000000,0f962d0..19d95b3
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@@ -1,0 -1,107 +1,107 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.distributed.cache.server;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+
++import org.apache.nifi.annotation.lifecycle.OnShutdown;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.controller.AbstractControllerService;
+ import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.processor.annotation.OnShutdown;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.ssl.SSLContextService;
+
+ public abstract class DistributedCacheServer extends AbstractControllerService {
+ public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
+ public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
+ public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("The port to listen on for incoming connections")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .build();
+ public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
+ .name("Maximum Cache Entries")
+ .description("The maximum number of cache entries that the cache can hold")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10000")
+ .build();
+ public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
+ .name("Eviction Strategy")
+ .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
+ .required(true)
+ .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
+ .defaultValue(EVICTION_STRATEGY_LFU)
+ .build();
+ public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
+ .name("Persistence Directory")
+ .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
+ .required(false)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+ .build();
+
+ private volatile CacheServer cacheServer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PORT);
+ properties.add(MAX_CACHE_ENTRIES);
+ properties.add(EVICTION_POLICY);
+ properties.add(PERSISTENCE_PATH);
+ properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
+ getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+ return properties;
+ }
+
+ @OnConfigured
+ public void startServer(final ConfigurationContext context) throws IOException {
+ if (cacheServer == null) {
+ cacheServer = createCacheServer(context);
+ cacheServer.start();
+ }
+ }
+
+ @OnShutdown
+ public void shutdownServer() throws IOException {
+ if (cacheServer != null) {
+ cacheServer.stop();
+ }
+ cacheServer = null;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ shutdownServer();
+ }
+
+ protected abstract CacheServer createCacheServer(ConfigurationContext context);
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index 0000000,d7aae16..68f83d4
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@@ -1,0 -1,354 +1,354 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.ssl;
+
+ import java.io.File;
+ import java.net.MalformedURLException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Map;
+
+ import javax.net.ssl.SSLContext;
+
++import org.apache.nifi.annotation.lifecycle.OnEnabled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.controller.AbstractControllerService;
+ import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.annotation.OnConfigured;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.reporting.InitializationException;
+ import org.apache.nifi.security.util.CertificateUtils;
+ import org.apache.nifi.security.util.KeystoreType;
+ import org.apache.nifi.security.util.SslContextFactory;
+
+ public class StandardSSLContextService extends AbstractControllerService implements SSLContextService {
+
+ public static final String STORE_TYPE_JKS = "JKS";
+ public static final String STORE_TYPE_PKCS12 = "PKCS12";
+
+ public static final PropertyDescriptor TRUSTSTORE = new PropertyDescriptor.Builder()
+ .name("Truststore Filename")
+ .description("The fully-qualified filename of the Truststore")
+ .defaultValue(null)
+ .addValidator(createFileExistsAndReadableValidator())
+ .sensitive(false)
+ .build();
+ public static final PropertyDescriptor TRUSTSTORE_TYPE = new PropertyDescriptor.Builder()
+ .name("Truststore Type")
+ .description("The Type of the Truststore. Either JKS or PKCS12")
+ .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue(STORE_TYPE_JKS)
+ .sensitive(false)
+ .build();
+ public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
+ .name("Truststore Password")
+ .description("The password for the Truststore")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+ public static final PropertyDescriptor KEYSTORE = new PropertyDescriptor.Builder()
+ .name("Keystore Filename")
+ .description("The fully-qualified filename of the Keystore")
+ .defaultValue(null)
+ .addValidator(createFileExistsAndReadableValidator())
+ .sensitive(false)
+ .build();
+ public static final PropertyDescriptor KEYSTORE_TYPE = new PropertyDescriptor.Builder()
+ .name("Keystore Type")
+ .description("The Type of the Keystore")
+ .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue(STORE_TYPE_JKS)
+ .sensitive(false)
+ .build();
+ public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
+ .name("Keystore Password")
+ .defaultValue(null)
+ .description("The password for the Keystore")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ private static final List<PropertyDescriptor> properties;
+
+ static {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(KEYSTORE);
+ props.add(KEYSTORE_PASSWORD);
+ props.add(KEYSTORE_TYPE);
+ props.add(TRUSTSTORE);
+ props.add(TRUSTSTORE_PASSWORD);
+ props.add(TRUSTSTORE_TYPE);
+ properties = Collections.unmodifiableList(props);
+ }
+ private ConfigurationContext configContext;
+
- @OnConfigured
++ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) throws InitializationException {
+ configContext = context;
+
+ final Collection<ValidationResult> results = new ArrayList<>();
+ results.addAll(validateStore(context.getProperties(), KeystoreValidationGroup.KEYSTORE));
+ results.addAll(validateStore(context.getProperties(), KeystoreValidationGroup.TRUSTSTORE));
+
+ if (!results.isEmpty()) {
+ final StringBuilder sb = new StringBuilder(this + " is not valid due to:");
+ for (final ValidationResult result : results) {
+ sb.append("\n").append(result.toString());
+ }
+ throw new InitializationException(sb.toString());
+ }
+
+ if (countNulls(context.getProperty(KEYSTORE).getValue(),
+ context.getProperty(KEYSTORE_PASSWORD).getValue(),
+ context.getProperty(KEYSTORE_TYPE).getValue(),
+ context.getProperty(TRUSTSTORE).getValue(),
+ context.getProperty(TRUSTSTORE_PASSWORD).getValue(),
+ context.getProperty(TRUSTSTORE_TYPE).getValue()) >= 4) {
+ throw new InitializationException(this + " does not have the KeyStore or the TrustStore populated");
+ }
+
+ // verify that the filename, password, and type match
+ createSSLContext(ClientAuth.REQUIRED);
+ }
+
+ private static Validator createFileExistsAndReadableValidator() {
+ return new Validator() {
+ // Not using the FILE_EXISTS_VALIDATOR because the default is to
+ // allow expression language
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ final String substituted;
+ try {
+ substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Not a valid Expression Language value: " + e.getMessage())
+ .build();
+ }
+
+ final File file = new File(substituted);
+ final boolean valid = file.exists() && file.canRead();
+ final String explanation = valid ? null : "File " + file + " does not exist or cannot be read";
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(valid)
+ .explanation(explanation)
+ .build();
+ }
+ };
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new ArrayList<>();
+ results.addAll(validateStore(validationContext.getProperties(), KeystoreValidationGroup.KEYSTORE));
+ results.addAll(validateStore(validationContext.getProperties(), KeystoreValidationGroup.TRUSTSTORE));
+
+ if (countNulls(validationContext.getProperty(KEYSTORE).getValue(),
+ validationContext.getProperty(KEYSTORE_PASSWORD).getValue(),
+ validationContext.getProperty(KEYSTORE_TYPE).getValue(),
+ validationContext.getProperty(TRUSTSTORE).getValue(),
+ validationContext.getProperty(TRUSTSTORE_PASSWORD).getValue(),
+ validationContext.getProperty(TRUSTSTORE_TYPE).getValue())
+ >= 4) {
+ results.add(new ValidationResult.Builder()
+ .subject(this.getClass().getSimpleName() + " : " + getIdentifier())
+ .valid(false)
+ .explanation("Does not have the KeyStore or the TrustStore populated")
+ .build());
+ }
+ if (results.isEmpty()) {
+ // verify that the filename, password, and type match
+ try {
+ createSSLContext(ClientAuth.REQUIRED);
+ } catch (ProcessException e) {
+ results.add(new ValidationResult.Builder()
+ .subject(getClass().getSimpleName() + " : " + getIdentifier())
+ .valid(false)
+ .explanation(e.getMessage())
+ .build());
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException {
+ try {
+ final String keystoreFile = configContext.getProperty(KEYSTORE).getValue();
+ if (keystoreFile == null) {
+ return SslContextFactory.createTrustSslContext(
+ configContext.getProperty(TRUSTSTORE).getValue(),
+ configContext.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
+ configContext.getProperty(TRUSTSTORE_TYPE).getValue());
+ }
+ final String truststoreFile = configContext.getProperty(TRUSTSTORE).getValue();
+ if (truststoreFile == null) {
+ return SslContextFactory.createSslContext(
+ configContext.getProperty(KEYSTORE).getValue(),
+ configContext.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(),
+ configContext.getProperty(KEYSTORE_TYPE).getValue());
+ }
+
+ return SslContextFactory.createSslContext(
+ configContext.getProperty(KEYSTORE).getValue(),
+ configContext.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(),
+ configContext.getProperty(KEYSTORE_TYPE).getValue(),
+ configContext.getProperty(TRUSTSTORE).getValue(),
+ configContext.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
+ configContext.getProperty(TRUSTSTORE_TYPE).getValue(),
+ org.apache.nifi.security.util.SslContextFactory.ClientAuth.valueOf(clientAuth.name()));
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+ }
+
+ @Override
+ public String getTrustStoreFile() {
+ return configContext.getProperty(TRUSTSTORE).getValue();
+ }
+
+ @Override
+ public String getTrustStoreType() {
+ return configContext.getProperty(TRUSTSTORE_TYPE).getValue();
+ }
+
+ @Override
+ public String getTrustStorePassword() {
+ return configContext.getProperty(TRUSTSTORE_PASSWORD).getValue();
+ }
+
+ @Override
+ public boolean isTrustStoreConfigured() {
+ return getTrustStoreFile() != null && getTrustStorePassword() != null && getTrustStoreType() != null;
+ }
+
+ @Override
+ public String getKeyStoreFile() {
+ return configContext.getProperty(KEYSTORE).getValue();
+ }
+
+ @Override
+ public String getKeyStoreType() {
+ return configContext.getProperty(KEYSTORE_TYPE).getValue();
+ }
+
+ @Override
+ public String getKeyStorePassword() {
+ return configContext.getProperty(KEYSTORE_PASSWORD).getValue();
+ }
+
+ @Override
+ public boolean isKeyStoreConfigured() {
+ return getKeyStoreFile() != null && getKeyStorePassword() != null && getKeyStoreType() != null;
+ }
+
+ private static Collection<ValidationResult> validateStore(final Map<PropertyDescriptor, String> properties,
+ final KeystoreValidationGroup keyStoreOrTrustStore) {
+ final Collection<ValidationResult> results = new ArrayList<>();
+
+ final String filename;
+ final String password;
+ final String type;
+
+ if (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) {
+ filename = properties.get(KEYSTORE);
+ password = properties.get(KEYSTORE_PASSWORD);
+ type = properties.get(KEYSTORE_TYPE);
+ } else {
+ filename = properties.get(TRUSTSTORE);
+ password = properties.get(TRUSTSTORE_PASSWORD);
+ type = properties.get(TRUSTSTORE_TYPE);
+ }
+
+ final String keystoreDesc = (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) ? "Keystore" : "Truststore";
+
+ final int nulls = countNulls(filename, password, type);
+ if (nulls != 3 && nulls != 0) {
+ results.add(new ValidationResult.Builder().valid(false).explanation("Must set either 0 or 3 properties for " + keystoreDesc)
+ .subject(keystoreDesc + " Properties").build());
+ } else if (nulls == 0) {
+ // all properties were filled in.
+ final File file = new File(filename);
+ if (!file.exists() || !file.canRead()) {
+ results.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(keystoreDesc + " Properties")
+ .explanation("Cannot access file " + file.getAbsolutePath())
+ .build());
+ } else {
+ try {
+ final boolean storeValid = CertificateUtils
+ .isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), password.toCharArray());
+ if (!storeValid) {
+ results.add(new ValidationResult.Builder()
+ .subject(keystoreDesc + " Properties")
+ .valid(false)
+ .explanation("Invalid KeyStore Password or Type specified for file " + filename)
+ .build());
+ }
+ } catch (MalformedURLException e) {
+ results.add(new ValidationResult.Builder()
+ .subject(keystoreDesc + " Properties")
+ .valid(false)
+ .explanation("Malformed URL from file: " + e)
+ .build());
+ }
+ }
+ }
+
+ return results;
+ }
+
+ private static int countNulls(Object... objects) {
+ int count = 0;
+ for (final Object x : objects) {
+ if (x == null) {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+ public static enum KeystoreValidationGroup {
+
+ KEYSTORE, TRUSTSTORE
+ }
+
+ @Override
+ public String toString() {
+ return "SSLContextService[id=" + getIdentifier() + "]";
+ }
+ }