You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2016/12/30 15:38:06 UTC

nifi git commit: NIFI-3266 Added EL support for basePath and port in ListenHTTP

Repository: nifi
Updated Branches:
  refs/heads/master 0d14db72f -> 55f4716f3


NIFI-3266 Added EL support for basePath and port in ListenHTTP

This closes #1373.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/55f4716f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/55f4716f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/55f4716f

Branch: refs/heads/master
Commit: 55f4716f3d2bed47f422251c97155abcf197a480
Parents: 0d14db7
Author: Davy De Waele <dd...@gmail.com>
Authored: Thu Dec 29 23:24:21 2016 +0100
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Dec 30 16:35:46 2016 +0100

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    |  37 ++--
 .../processors/standard/TestListenHTTP.java     | 175 +++++++++++++++++++
 2 files changed, 194 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/55f4716f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 3e8576d..82eee92 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -16,22 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
-import javax.servlet.Servlet;
-import javax.ws.rs.Path;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -64,6 +48,21 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
+import javax.servlet.Servlet;
+import javax.ws.rs.Path;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"ingest", "http", "https", "rest", "listen"})
 @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
@@ -81,6 +80,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         .name("Base Path")
         .description("Base path for incoming connections")
         .required(true)
+        .expressionLanguageSupported(true)
         .defaultValue("contentListener")
         .addValidator(StandardValidators.URI_VALIDATOR)
         .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
@@ -89,6 +89,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         .name("Listening Port")
         .description("The Port to listen on for incoming connections")
         .required(true)
+        .expressionLanguageSupported(true)
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .build();
     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
@@ -196,7 +197,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     }
 
     private void createHttpServerFromService(final ProcessContext context) throws Exception {
-        final String basePath = context.getProperty(BASE_PATH).getValue();
+        final String basePath = context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
@@ -232,7 +233,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final Server server = new Server(threadPool);
 
         // get the configured port
-        final int port = context.getProperty(PORT).asInteger();
+        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
 
         final ServerConnector connector;
         final HttpConfiguration httpConfiguration = new HttpConfiguration();

http://git-wip-us.apache.org/repos/asf/nifi/blob/55f4716f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
new file mode 100644
index 0000000..020535b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.ServerSocket;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
+import static org.junit.Assert.fail;
+
+public class TestListenHTTP {
+
+    private static final String HTTP_POST_METHOD = "POST";
+    private static final String HTTP_BASE_PATH = "basePath";
+
+    private final static String PORT_VARIABLE = "HTTP_PORT";
+    private final static String HTTP_SERVER_PORT_EL = "${" + PORT_VARIABLE + "}";
+
+    private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH";
+    private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}";
+
+    private ListenHTTP proc;
+    private TestRunner runner;
+
+    private int availablePort;
+
+    @Before
+    public void setup() throws IOException {
+        proc = new ListenHTTP();
+        runner = TestRunners.newTestRunner(proc);
+        availablePort = findAvailablePort();
+        runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
+        runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH);
+
+    }
+
+    @Test
+    public void testPOSTRequestsReceivedWithoutEL() throws Exception {
+
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+
+        testPOSTRequestsReceived();
+    }
+
+    @Test
+    public void testPOSTRequestsReceivedWithEL() throws Exception {
+
+        runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL);
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
+
+        testPOSTRequestsReceived();
+    }
+
+    private int executePOST(String message) throws Exception {
+
+        URL url= new URL("http://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
+        HttpURLConnection con = (HttpURLConnection) url.openConnection();
+
+        con.setRequestMethod(HTTP_POST_METHOD);
+        con.setDoOutput(true);
+        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
+        if (message!=null) {
+            wr.writeBytes(message);
+        }
+        wr.flush();
+        wr.close();
+
+        return con.getResponseCode();
+
+    }
+    private void testPOSTRequestsReceived() throws Exception {
+        final List<String> messages = new ArrayList<>();
+        messages.add("payload 1");
+        messages.add("");
+        messages.add(null);
+        messages.add("payload 2");
+
+        startWebServerAndSendMessages(messages);
+
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
+
+        runner.assertTransferCount(RELATIONSHIP_SUCCESS,4);
+        mockFlowFiles.get(0).assertContentEquals("payload 1");
+        mockFlowFiles.get(1).assertContentEquals("");
+        mockFlowFiles.get(2).assertContentEquals("");
+        mockFlowFiles.get(3).assertContentEquals("payload 2");
+    }
+
+    private void startWebServerAndSendMessages(final List<String> messages)
+            throws Exception {
+
+            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+            final ProcessContext context = runner.getProcessContext();
+            proc.createHttpServer(context);
+
+            Runnable sendMessagestoWebServer = () -> {
+                try {
+                    for (final String message : messages) {
+                        if (executePOST(message)!=200) fail("HTTP POST failed.");
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail("Not expecting error here.");
+                }
+            };
+            new Thread(sendMessagestoWebServer).start();
+
+            long responseTimeout = 10000;
+
+            int numTransferred = 0;
+            long startTime = System.currentTimeMillis();
+            while (numTransferred < messages.size()  && (System.currentTimeMillis() - startTime < responseTimeout)) {
+                proc.onTrigger(context, processSessionFactory);
+                numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
+                Thread.sleep(100);
+            }
+
+            runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size());
+
+    }
+
+    private int findAvailablePort() throws IOException {
+        try (ServerSocket socket = new ServerSocket(0)) {
+            socket.setReuseAddress(true);
+            return socket.getLocalPort();
+        }
+    }
+
+    private SSLContextService configureProcessorSslContextService() throws InitializationException {
+        final SSLContextService sslContextService = new StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context");
+        return sslContextService;
+    }
+
+}