You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2016/12/30 15:38:06 UTC
nifi git commit: NIFI-3266 Added EL support for basePath and port in
ListenHTTP
Repository: nifi
Updated Branches:
refs/heads/master 0d14db72f -> 55f4716f3
NIFI-3266 Added EL support for basePath and port in ListenHTTP
This closes #1373.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/55f4716f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/55f4716f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/55f4716f
Branch: refs/heads/master
Commit: 55f4716f3d2bed47f422251c97155abcf197a480
Parents: 0d14db7
Author: Davy De Waele <dd...@gmail.com>
Authored: Thu Dec 29 23:24:21 2016 +0100
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Dec 30 16:35:46 2016 +0100
----------------------------------------------------------------------
.../nifi/processors/standard/ListenHTTP.java | 37 ++--
.../processors/standard/TestListenHTTP.java | 175 +++++++++++++++++++
2 files changed, 194 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/55f4716f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 3e8576d..82eee92 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -16,22 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
-import javax.servlet.Servlet;
-import javax.ws.rs.Path;
-
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -64,6 +48,21 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import javax.servlet.Servlet;
+import javax.ws.rs.Path;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"ingest", "http", "https", "rest", "listen"})
@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
@@ -81,6 +80,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.name("Base Path")
.description("Base path for incoming connections")
.required(true)
+ .expressionLanguageSupported(true)
.defaultValue("contentListener")
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
@@ -89,6 +89,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.name("Listening Port")
.description("The Port to listen on for incoming connections")
.required(true)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
@@ -196,7 +197,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
}
private void createHttpServerFromService(final ProcessContext context) throws Exception {
- final String basePath = context.getProperty(BASE_PATH).getValue();
+ final String basePath = context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
@@ -232,7 +233,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final Server server = new Server(threadPool);
// get the configured port
- final int port = context.getProperty(PORT).asInteger();
+ final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final ServerConnector connector;
final HttpConfiguration httpConfiguration = new HttpConfiguration();
http://git-wip-us.apache.org/repos/asf/nifi/blob/55f4716f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
new file mode 100644
index 0000000..020535b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.ServerSocket;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
+import static org.junit.Assert.fail;
+
+public class TestListenHTTP {
+
+ private static final String HTTP_POST_METHOD = "POST";
+ private static final String HTTP_BASE_PATH = "basePath";
+
+ private final static String PORT_VARIABLE = "HTTP_PORT";
+ private final static String HTTP_SERVER_PORT_EL = "${" + PORT_VARIABLE + "}";
+
+ private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH";
+ private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}";
+
+ private ListenHTTP proc;
+ private TestRunner runner;
+
+ private int availablePort;
+
+ @Before
+ public void setup() throws IOException {
+ proc = new ListenHTTP();
+ runner = TestRunners.newTestRunner(proc);
+ availablePort = findAvailablePort();
+ runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
+ runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH);
+
+ }
+
+ @Test
+ public void testPOSTRequestsReceivedWithoutEL() throws Exception {
+
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+
+ testPOSTRequestsReceived();
+ }
+
+ @Test
+ public void testPOSTRequestsReceivedWithEL() throws Exception {
+
+ runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL);
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
+
+ testPOSTRequestsReceived();
+ }
+
+ private int executePOST(String message) throws Exception {
+
+ URL url= new URL("http://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+
+ con.setRequestMethod(HTTP_POST_METHOD);
+ con.setDoOutput(true);
+ DataOutputStream wr = new DataOutputStream(con.getOutputStream());
+ if (message!=null) {
+ wr.writeBytes(message);
+ }
+ wr.flush();
+ wr.close();
+
+ return con.getResponseCode();
+
+ }
+ private void testPOSTRequestsReceived() throws Exception {
+ final List<String> messages = new ArrayList<>();
+ messages.add("payload 1");
+ messages.add("");
+ messages.add(null);
+ messages.add("payload 2");
+
+ startWebServerAndSendMessages(messages);
+
+ List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
+
+ runner.assertTransferCount(RELATIONSHIP_SUCCESS,4);
+ mockFlowFiles.get(0).assertContentEquals("payload 1");
+ mockFlowFiles.get(1).assertContentEquals("");
+ mockFlowFiles.get(2).assertContentEquals("");
+ mockFlowFiles.get(3).assertContentEquals("payload 2");
+ }
+
+ private void startWebServerAndSendMessages(final List<String> messages)
+ throws Exception {
+
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.createHttpServer(context);
+
+ Runnable sendMessagestoWebServer = () -> {
+ try {
+ for (final String message : messages) {
+ if (executePOST(message)!=200) fail("HTTP POST failed.");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Not expecting error here.");
+ }
+ };
+ new Thread(sendMessagestoWebServer).start();
+
+ long responseTimeout = 10000;
+
+ int numTransferred = 0;
+ long startTime = System.currentTimeMillis();
+ while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
+ proc.onTrigger(context, processSessionFactory);
+ numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
+ Thread.sleep(100);
+ }
+
+ runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size());
+
+ }
+
+ private int findAvailablePort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ socket.setReuseAddress(true);
+ return socket.getLocalPort();
+ }
+ }
+
+ private SSLContextService configureProcessorSslContextService() throws InitializationException {
+ final SSLContextService sslContextService = new StandardSSLContextService();
+ runner.addControllerService("ssl-context", sslContextService);
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+ runner.enableControllerService(sslContextService);
+
+ runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context");
+ return sslContextService;
+ }
+
+}