You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:28 UTC

[02/50] [abbrv] nifi git commit: NIFI-747 This closes #104. PR from Venkatesh Sellappa was modified, then code reviewed by Joe Witt (comments in ticket)

NIFI-747 This closes #104. PR from Venkatesh Sellappa <VS...@outlook.com> was modified, then code reviewed by Joe Witt (comments in ticket)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88b1b844
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88b1b844
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88b1b844

Branch: refs/heads/NIFI-655
Commit: 88b1b844fba5e4dded6242bb17f2096ff4172ed3
Parents: ad73a23
Author: Tony Kurc <tr...@gmail.com>
Authored: Sun Oct 18 20:04:24 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Oct 18 20:04:24 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    | 28 ++++++++++++++++----
 .../servlets/ContentAcknowledgmentServlet.java  |  3 +--
 .../standard/servlets/ListenHTTPServlet.java    |  7 +++--
 3 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index c7842d9..a446eb6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -63,7 +63,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 @Tags({"ingest", "http", "https", "rest", "listen"})
-@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
 public class ListenHTTP extends AbstractSessionFactoryProcessor {
 
     private Set<Relationship> relationships;
@@ -74,6 +74,14 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .description("Relationship for successfully received FlowFiles")
             .build();
 
+    public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
+            .name("Base Path")
+            .description("Base path for incoming connections")
+            .required(true)
+            .defaultValue("contentListener")
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+            .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
             .name("Listening Port")
             .description("The Port to listen on for incoming connections")
@@ -113,7 +121,6 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .required(false)
             .build();
 
-    public static final String URI = "/contentListener";
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
@@ -122,6 +129,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
+    public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -134,6 +142,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         this.relationships = Collections.unmodifiableSet(relationships);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(BASE_PATH);
         descriptors.add(PORT);
         descriptors.add(MAX_DATA_RATE);
         descriptors.add(SSL_CONTEXT_SERVICE);
@@ -170,6 +179,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     }
 
     private void createHttpServerFromService(final ProcessContext context) throws Exception {
+        final String basePath = context.getProperty(BASE_PATH).getValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
@@ -230,12 +240,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
         for (final Class<? extends Servlet> cls : getServerClasses()) {
             final Path path = cls.getAnnotation(Path.class);
-            if (path == null) {
-                contextHandler.addServlet(cls, "/*");
-            } else {
+            // Note: servlets must have a path annotation - this will NPE otherwise
+            // also, servlets other than ListenHttpServlet must have a path starting with /
+            if(basePath.isEmpty() && !path.value().isEmpty()){
+                // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
                 contextHandler.addServlet(cls, path.value());
             }
+            else{
+                contextHandler.addServlet(cls, "/" + basePath + path.value());
+            }
         }
+
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
@@ -243,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
@@ -259,6 +275,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
 
     protected Set<Class<? extends Servlet>> getServerClasses() {
         final Set<Class<? extends Servlet>> s = new HashSet<>();
+        // NOTE: Servlets added below MUST have a Path annotation
+        // any servlets other than ListenHTTPServlet must have a Path annotation start with /
         s.add(ListenHTTPServlet.class);
         s.add(ContentAcknowledgmentServlet.class);
         return s;

http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
index 7dd6797..3252aea 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
@@ -38,10 +38,9 @@ import org.apache.nifi.processors.standard.ListenHTTP;
 import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
 import org.apache.nifi.util.FormatUtils;
 
-@Path(ContentAcknowledgmentServlet.URI)
+@Path("/holds/*")
 public class ContentAcknowledgmentServlet extends HttpServlet {
 
-    public static final String URI = ListenHTTP.URI + "/holds/*";
     public static final String DEFAULT_FOUND_SUBJECT = "none";
     private static final long serialVersionUID = -2675148117984902978L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 6a8f32f..79d3887 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -61,7 +61,8 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
-@Path(ListenHTTP.URI)
+
+@Path("")
 public class ListenHTTPServlet extends HttpServlet {
 
     private static final long serialVersionUID = 5329940480987723163L;
@@ -93,6 +94,7 @@ public class ListenHTTPServlet extends HttpServlet {
     private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
     private StreamThrottler streamThrottler;
+    private String basePath;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -105,6 +107,7 @@ public class ListenHTTPServlet extends HttpServlet {
         this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
+        this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
     }
 
     @Override
@@ -291,7 +294,7 @@ public class ListenHTTPServlet extends HttpServlet {
                 } while (previousWrapper != null);
 
                 response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-                final String ackUri = ListenHTTP.URI + "/holds/" + uuid;
+                final String ackUri =  "/" + basePath + "/holds/" + uuid;
                 response.addHeader(LOCATION_HEADER_NAME, ackUri);
                 response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
                 response.getOutputStream().write(ackUri.getBytes("UTF-8"));