You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/11 19:42:00 UTC

[1/2] nifi git commit: NIFI-1490: multipart/form-data support for ListenHTTP processor - introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk - using java.io.tmpdir for such file writes - enhancing document

Repository: nifi
Updated Branches:
  refs/heads/master 8398ea77b -> 5aa426358


NIFI-1490: multipart/form-data support for ListenHTTP processor
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c81a1351
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c81a1351
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c81a1351

Branch: refs/heads/master
Commit: c81a135161c7af106117dcc616706798c4564669
Parents: 8398ea7
Author: Endre Zoltan Kovacs <an...@protonmail.com>
Authored: Thu Sep 6 17:33:33 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 15:41:19 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    |  29 +-
 .../standard/servlets/ListenHTTPServlet.java    | 329 ++++++++++++-------
 .../processors/standard/TestListenHTTP.java     | 174 ++++++++--
 3 files changed, 392 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 209e6d1..39c91db 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,6 +134,25 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         .defaultValue(String.valueOf(HttpServletResponse.SC_OK))
         .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
         .build();
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+        .name("max-request-size")
+        .displayName("Max Request Size")
+        .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
+                + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
+    public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder()
+        .name("in-memory-file-size-threshold")
+        .displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+                + "Only applies for requests with Content-Type: multipart/form-data. "
+                + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
+        .description("The threshold value for writing a file to disk.")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("512 KB")
+        .build();
 
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -145,6 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
     public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
     public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
+    public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize";
+    public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -166,6 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         descriptors.add(MAX_UNCONFIRMED_TIME);
         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         descriptors.add(RETURN_CODE);
+        descriptors.add(MAX_REQUEST_SIZE);
+        descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -214,6 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
+        long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
+        int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
         throttlerRef.set(streamThrottler);
 
         final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null;
@@ -295,7 +320,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 85de6f8..8803f0b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -20,10 +20,12 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.security.cert.X509Certificate;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -34,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
+
+import javax.servlet.MultipartConfigElement;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.MediaType;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.FlowFile;
@@ -54,10 +60,15 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.ListenHTTP;
 import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
 import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFileUnpackager;
 import org.apache.nifi.util.FlowFileUnpackagerV1;
 import org.apache.nifi.util.FlowFileUnpackagerV2;
 import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 
 
 @Path("")
@@ -94,6 +105,8 @@ public class ListenHTTPServlet extends HttpServlet {
     private StreamThrottler streamThrottler;
     private String basePath;
     private int returnCode;
+    private long maxRequestSize;
+    private int inMemoryFileSizeThreshold;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -108,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet {
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
         this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
         this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
+        this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
+        this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
     }
 
     @Override
@@ -133,8 +148,6 @@ public class ListenHTTPServlet extends HttpServlet {
         } while (sessionFactory == null);
 
         final ProcessSession session = sessionFactory.createSession();
-        FlowFile flowFile = null;
-        String holdUuid = null;
         String foundSubject = null;
         try {
             final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
@@ -191,134 +204,212 @@ public class ListenHTTPServlet extends HttpServlet {
                 logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
             }
 
-            final AtomicBoolean hasMoreData = new AtomicBoolean(false);
-            final FlowFileUnpackager unpackager;
-            if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV3();
-            } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV2();
-            } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV1();
+            Set<FlowFile> flowFileSet = null;
+            if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
+              flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
             } else {
-                unpackager = null;
+              flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
             }
+            proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
+        } catch (final Throwable t) {
+            handleException(request, response, session, foundSubject, t);
+        }
+    }
 
-            final Set<FlowFile> flowFileSet = new HashSet<>();
-
-            do {
-                final long startNanos = System.nanoTime();
-                final Map<String, String> attributes = new HashMap<>();
-                flowFile = session.create();
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream rawOut) throws IOException {
-                        try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
-                            if (unpackager == null) {
-                                IOUtils.copy(in, bos);
-                                hasMoreData.set(false);
-                            } else {
-                                attributes.putAll(unpackager.unpackageFlowFile(in, bos));
-
-                                if (destinationIsLegacyNiFi) {
-                                    if (attributes.containsKey("nf.file.name")) {
-                                        // for backward compatibility with old nifi...
-                                        attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
-                                    }
-
-                                    if (attributes.containsKey("nf.file.path")) {
-                                        attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
-                                    }
-                                }
-
-                                hasMoreData.set(unpackager.hasMoreData());
-                            }
-                        }
-                    }
-                });
-
-                final long transferNanos = System.nanoTime() - startNanos;
-                final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+    private void handleException(final HttpServletRequest request, final HttpServletResponse response,
+        final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+      session.rollback();
+      logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+    }
 
-                // put metadata on flowfile
-                final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
-                if (StringUtils.isNotBlank(nameVal)) {
-                    attributes.put(CoreAttributes.FILENAME.key(), nameVal);
-                }
+    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject,
+        boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException {
+      Set<FlowFile> flowFileSet = new HashSet<>();
+      String tempDir = System.getProperty("java.io.tmpdir");
+      request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold));
+      List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+      for (int i = 0; i < requestParts.size(); i++) {
+        Part part = requestParts.get(i);
+        FlowFile flowFile = session.create();
+        try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
+          StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+        }
+        flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+        flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
+        flowFileSet.add(flowFile);
+      }
+      return flowFileSet;
+    }
 
-                // put arbitrary headers on flow file
-                for (Enumeration<String> headerEnum = request.getHeaderNames();
-                        headerEnum.hasMoreElements();) {
-                    String headerName = headerEnum.nextElement();
-                    if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
-                        String headerValue = request.getHeader(headerName);
-                        attributes.put(headerName, headerValue);
-                    }
-                }
+    private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+      Map<String, String> attributes = new HashMap<>();
+      for (String headerName : part.getHeaderNames()) {
+        final String headerValue = part.getHeader(headerName);
+        putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+      }
+      putAttribute(attributes, "http.multipart.size", part.getSize());
+      putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+      putAttribute(attributes, "http.multipart.name", part.getName());
+      putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+      putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
+      putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+      return session.putAllAttributes(flowFile, attributes);
+    }
 
-                String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
-                if (sourceSystemFlowFileIdentifier != null) {
-                    sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+    private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
+        String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+      FlowFile flowFile = null;
+      String holdUuid = null;
+      final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+      final FlowFileUnpackager unpackager;
+      if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV3();
+      } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV2();
+      } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV1();
+      } else {
+          unpackager = null;
+      }
+
+      final Set<FlowFile> flowFileSet = new HashSet<>();
+
+      do {
+          final long startNanos = System.nanoTime();
+          final Map<String, String> attributes = new HashMap<>();
+          flowFile = session.create();
+          flowFile = session.write(flowFile, new OutputStreamCallback() {
+              @Override
+              public void process(final OutputStream rawOut) throws IOException {
+                  try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+                      if (unpackager == null) {
+                          IOUtils.copy(in, bos);
+                          hasMoreData.set(false);
+                      } else {
+                          attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+
+                          if (destinationIsLegacyNiFi) {
+                              if (attributes.containsKey("nf.file.name")) {
+                                  // for backward compatibility with old nifi...
+                                  attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+                              }
+
+                              if (attributes.containsKey("nf.file.path")) {
+                                  attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+                              }
+                          }
+
+                          hasMoreData.set(unpackager.hasMoreData());
+                      }
+                  }
+              }
+          });
+
+          final long transferNanos = System.nanoTime() - startNanos;
+          final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+          // put metadata on flowfile
+          final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+          if (StringUtils.isNotBlank(nameVal)) {
+              attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+          }
+
+          String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+          if (sourceSystemFlowFileIdentifier != null) {
+              sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+
+              // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+              // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+              attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+          }
+
+          flowFile = session.putAllAttributes(flowFile, attributes);
+          flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+          session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+          flowFileSet.add(flowFile);
+
+          if (holdUuid == null) {
+              holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+          }
+      } while (hasMoreData.get());
+      return flowFileSet;
+    }
 
-                    // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
-                    // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
-                    attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-                }
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
+        String foundSubject, FlowFile flowFile) {
+      Map<String, String> attributes = new HashMap<>();
+      addMatchingRequestHeaders(request, attributes);
+      flowFile = session.putAllAttributes(flowFile, attributes);
+      flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+      flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+      flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+      return flowFile;
+    }
 
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
-                flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
-                flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
-                flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
-                flowFileSet.add(flowFile);
+    private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
+      // put arbitrary headers on flow file
+      for (Enumeration<String> headerEnum = request.getHeaderNames();
+              headerEnum.hasMoreElements();) {
+          String headerName = headerEnum.nextElement();
+          if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+              String headerValue = request.getHeader(headerName);
+              attributes.put(headerName, headerValue);
+          }
+      }
+    }
 
-                if (holdUuid == null) {
-                    holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
-                }
-            } while (hasMoreData.get());
+    protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
+        final ProcessSession session, String foundSubject, final boolean createHold,
+        final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+      if (createHold) {
+          String uuid = UUID.randomUUID().toString();
+
+          if (flowFileMap.containsKey(uuid)) {
+              uuid = UUID.randomUUID().toString();
+          }
+
+          final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
+          FlowFileEntryTimeWrapper previousWrapper;
+          do {
+            previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+              if (previousWrapper != null) {
+                  uuid = UUID.randomUUID().toString();
+              }
+          } while (previousWrapper != null);
+
+          response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+          final String ackUri =  "/" + basePath + "/holds/" + uuid;
+          response.addHeader(LOCATION_HEADER_NAME, ackUri);
+          response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+          response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+          if (logger.isDebugEnabled()) {
+              logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+                      new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+          }
+      } else {
+          response.setStatus(this.returnCode);
+          logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
+                  new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+
+          session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+          session.commit();
+      }
+    }
 
-            if (createHold) {
-                String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid;
+    private void putAttribute(final Map<String, String> map, final String key, final Object value) {
+      if (value == null) {
+          return;
+      }
 
-                if (flowFileMap.containsKey(uuid)) {
-                    uuid = UUID.randomUUID().toString();
-                }
+      putAttribute(map, key, value.toString());
+  }
 
-                final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
-                FlowFileEntryTimeWrapper previousWrapper;
-                do {
-                    previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
-                    if (previousWrapper != null) {
-                        uuid = UUID.randomUUID().toString();
-                    }
-                } while (previousWrapper != null);
-
-                response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-                final String ackUri =  "/" + basePath + "/holds/" + uuid;
-                response.addHeader(LOCATION_HEADER_NAME, ackUri);
-                response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
-                response.getOutputStream().write(ackUri.getBytes("UTF-8"));
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
-                            new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
-                }
-            } else {
-                response.setStatus(this.returnCode);
-                logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}",
-                        new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile});
+    private void putAttribute(final Map<String, String> map, final String key, final String value) {
+      if (value == null) {
+          return;
+      }
 
-                session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
-                session.commit();
-            }
-        } catch (final Throwable t) {
-            session.rollback();
-            if (flowFile == null) {
-                logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}",
-                        new Object[]{request.getRemoteHost(), foundSubject, t});
-            } else {
-                logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}",
-                        new Object[]{flowFile, request.getRemoteHost(), foundSubject, t});
-            }
-            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
-        }
-    }
+      map.put(key, value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 117a068..f8e9015 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -27,15 +27,31 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -66,7 +82,7 @@ public class TestListenHTTP {
     public void setup() throws IOException {
         proc = new ListenHTTP();
         runner = TestRunners.newTestRunner(proc);
-        availablePort = NetworkUtils.availablePort();;
+        availablePort = NetworkUtils.availablePort();
         runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
         runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
 
@@ -181,8 +197,8 @@ public class TestListenHTTP {
     private int executePOST(String message) throws Exception {
         final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
         final boolean secure = (sslContextService != null);
-        final String scheme = secure ? "https" : "http";
-        final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
+        String endpointUrl = buildUrl(secure);
+        final URL url = new URL(endpointUrl);
         HttpURLConnection connection;
 
         if (secure) {
@@ -207,6 +223,10 @@ public class TestListenHTTP {
         return connection.getResponseCode();
     }
 
+    private String buildUrl(final boolean secure) {
+      return String.format("%s://localhost:%s/%s", secure ? "https" : "http" , availablePort,  HTTP_BASE_PATH);
+    }
+
     private void testPOSTRequestsReceived(int returnCode) throws Exception {
         final List<String> messages = new ArrayList<>();
         messages.add("payload 1");
@@ -225,13 +245,29 @@ public class TestListenHTTP {
         mockFlowFiles.get(3).assertContentEquals("payload 2");
     }
 
+    private void startWebServerAndSendRequests(Runnable sendRequestToWebserver, int numberOfExpectedFlowFiles, int returnCode) throws Exception {
+      final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+      final ProcessContext context = runner.getProcessContext();
+      proc.createHttpServer(context);
+
+      new Thread(sendRequestToWebserver).start();
+
+      long responseTimeout = 10000;
+
+      int numTransferred = 0;
+      long startTime = System.currentTimeMillis();
+      while (numTransferred < numberOfExpectedFlowFiles && (System.currentTimeMillis() - startTime < responseTimeout)) {
+          proc.onTrigger(context, processSessionFactory);
+          numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
+          Thread.sleep(100);
+      }
+
+      runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, numberOfExpectedFlowFiles);
+    }
+
     private void startWebServerAndSendMessages(final List<String> messages, int returnCode)
             throws Exception {
 
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.createHttpServer(context);
-
         Runnable sendMessagestoWebServer = () -> {
             try {
                 for (final String message : messages) {
@@ -244,20 +280,8 @@ public class TestListenHTTP {
                 fail("Not expecting error here.");
             }
         };
-        new Thread(sendMessagestoWebServer).start();
-
-        long responseTimeout = 10000;
-
-        int numTransferred = 0;
-        long startTime = System.currentTimeMillis();
-        while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
-            proc.onTrigger(context, processSessionFactory);
-            numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
-            Thread.sleep(100);
-        }
-
-        runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size());
 
+        startWebServerAndSendRequests(sendMessagestoWebServer, messages.size(), returnCode);
     }
 
     private SSLContextService configureProcessorSslContextService() throws InitializationException {
@@ -287,4 +311,114 @@ public class TestListenHTTP {
         runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, SSL_CONTEXT_SERVICE_IDENTIFIER);
         return sslContextService;
     }
+
+
+    @Test(/*timeout=10000*/)
+    public void testMultipartFormDataRequest() throws Exception {
+
+      runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+      runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+      runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
+
+      final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
+      final boolean isSecure = (sslContextService != null);
+
+      Runnable sendRequestToWebserver = () -> {
+        try {
+          MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM)
+              .addFormDataPart("p1", "v1")
+              .addFormDataPart("p2", "v2")
+              .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+              .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+              .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+              .build();
+
+          Request request =
+              new Request.Builder()
+                .url(buildUrl(isSecure))
+                .post(multipartBody)
+                .build();
+
+          int timeout = 3000;
+          OkHttpClient client = new OkHttpClient.Builder()
+                .readTimeout(timeout, TimeUnit.MILLISECONDS)
+                .writeTimeout(timeout, TimeUnit.MILLISECONDS)
+                .build();
+
+          try (Response response = client.newCall(request).execute()) {
+            Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
+          }
+        } catch (final Throwable t) {
+          t.printStackTrace();
+          Assert.fail(t.toString());
+        }
+      };
+
+
+      startWebServerAndSendRequests(sendRequestToWebserver, 5, 200);
+
+      runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
+      List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
+      // Part fragments are not processed in the order we submitted them.
+      // We cannot rely on the order we sent them in.
+      MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+      mff.assertAttributeEquals("http.multipart.name", "p1");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+      mff.assertAttributeEquals("http.multipart.name", "p2");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+      mff.assertAttributeEquals("http.multipart.name", "file1");
+      mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+      mff.assertAttributeEquals("http.multipart.name", "file2");
+      mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+      mff.assertAttributeEquals("http.multipart.name", "file3");
+      mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+    }
+
+     private byte[] generateRandomBinaryData(int i) {
+      byte[] bytes = new byte[100];
+      new Random().nextBytes(bytes);
+      return bytes;
+    }
+     private File createTextFile(String fileName, String... lines) throws IOException {
+      File file = new File(fileName);
+      file.deleteOnExit();
+      for (String string : lines) {
+        Files.append(string, file, Charsets.UTF_8);
+      }
+      return file;
+    }
+     protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
+      Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+      Assert.assertTrue(optional.isPresent());
+      return optional.get();
+    }
 }


[2/2] nifi git commit: NIFI-1490: better field naming / displayname and description mix up fix

Posted by ma...@apache.org.
NIFI-1490: better field naming / displayname and description mix up fix

This closes #2994.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa42635
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa42635
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa42635

Branch: refs/heads/master
Commit: 5aa426358802bfac91657fdb8a8a83094239ced8
Parents: c81a135
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
Authored: Mon Oct 8 13:10:37 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 15:41:38 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    |  30 +-
 .../standard/servlets/ListenHTTPServlet.java    | 420 +++++++++----------
 2 files changed, 223 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 39c91db..5ea9f3a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,21 +134,21 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         .defaultValue(String.valueOf(HttpServletResponse.SC_OK))
         .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
         .build();
-    public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
-        .name("max-request-size")
-        .displayName("Max Request Size")
+    public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder()
+        .name("multipart-request-max-size")
+        .displayName("Multipart Request Max Size")
         .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
                 + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
         .required(true)
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .defaultValue("1 MB")
         .build();
-    public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder()
-        .name("in-memory-file-size-threshold")
-        .displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+    public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder()
+        .name("multipart-read-buffer-size")
+        .displayName("Multipart Read Buffer Size")
+        .description("The threshold size, at which the contents of an incoming file would be written to disk. "
                 + "Only applies for requests with Content-Type: multipart/form-data. "
                 + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
-        .description("The threshold value for writing a file to disk.")
         .required(true)
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .defaultValue("512 KB")
@@ -164,8 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
     public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
     public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
-    public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize";
-    public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold";
+    public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = "multipartRequestMaxSize";
+    public static final String CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE = "multipartReadBufferSize";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -187,8 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         descriptors.add(MAX_UNCONFIRMED_TIME);
         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         descriptors.add(RETURN_CODE);
-        descriptors.add(MAX_REQUEST_SIZE);
-        descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
+        descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
+        descriptors.add(MULTIPART_READ_BUFFER_SIZE);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -237,8 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
-        long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
-        int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
+        long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
+        int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         throttlerRef.set(streamThrottler);
 
         final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null;
@@ -321,8 +321,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize);
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, requestMaxSize);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE, readBufferSize);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 8803f0b..07ccd69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -16,6 +16,37 @@
  */
 package org.apache.nifi.processors.standard.servlets;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.standard.ListenHTTP;
+import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.FlowFileUnpackager;
+import org.apache.nifi.util.FlowFileUnpackagerV1;
+import org.apache.nifi.util.FlowFileUnpackagerV2;
+import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import javax.servlet.MultipartConfigElement;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,40 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 
-import javax.servlet.MultipartConfigElement;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.Part;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.ListenHTTP;
-import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
-import org.apache.nifi.stream.io.StreamThrottler;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.FlowFileUnpackager;
-import org.apache.nifi.util.FlowFileUnpackagerV1;
-import org.apache.nifi.util.FlowFileUnpackagerV2;
-import org.apache.nifi.util.FlowFileUnpackagerV3;
-import org.eclipse.jetty.server.Request;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
-
 @Path("")
 public class ListenHTTPServlet extends HttpServlet {
 
@@ -105,8 +102,8 @@ public class ListenHTTPServlet extends HttpServlet {
     private StreamThrottler streamThrottler;
     private String basePath;
     private int returnCode;
-    private long maxRequestSize;
-    private int inMemoryFileSizeThreshold;
+    private long multipartRequestMaxSize;
+    private int multipartReadBufferSize;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -121,8 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
         this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
         this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
-        this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
-        this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
+        this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
+        this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
     }
 
     @Override
@@ -204,11 +201,11 @@ public class ListenHTTPServlet extends HttpServlet {
                 logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
             }
 
-            Set<FlowFile> flowFileSet = null;
+            Set<FlowFile> flowFileSet;
             if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
-              flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+                flowFileSet = handleMultipartRequest(request, session, foundSubject);
             } else {
-              flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+                flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
             }
             proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
         } catch (final Throwable t) {
@@ -217,199 +214,198 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     private void handleException(final HttpServletRequest request, final HttpServletResponse response,
-        final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
-      session.rollback();
-      logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
-      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+                                 final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+        session.rollback();
+        logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
     }
 
-    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject,
-        boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException {
-      Set<FlowFile> flowFileSet = new HashSet<>();
-      String tempDir = System.getProperty("java.io.tmpdir");
-      request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold));
-      List<Part> requestParts = ImmutableList.copyOf(request.getParts());
-      for (int i = 0; i < requestParts.size(); i++) {
-        Part part = requestParts.get(i);
-        FlowFile flowFile = session.create();
-        try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
-          StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject) throws IOException, IllegalStateException, ServletException {
+        Set<FlowFile> flowFileSet = new HashSet<>();
+        String tempDir = System.getProperty("java.io.tmpdir");
+        request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
+        List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+        for (int i = 0; i < requestParts.size(); i++) {
+            Part part = requestParts.get(i);
+            FlowFile flowFile = session.create();
+            try (OutputStream flowFileOutputStream = session.write(flowFile)) {
+                StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
+            }
+            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+            flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
+            flowFileSet.add(flowFile);
         }
-        flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
-        flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
-        flowFileSet.add(flowFile);
-      }
-      return flowFileSet;
+        return flowFileSet;
     }
 
-    private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
-      Map<String, String> attributes = new HashMap<>();
-      for (String headerName : part.getHeaderNames()) {
-        final String headerValue = part.getHeader(headerName);
-        putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
-      }
-      putAttribute(attributes, "http.multipart.size", part.getSize());
-      putAttribute(attributes, "http.multipart.content.type", part.getContentType());
-      putAttribute(attributes, "http.multipart.name", part.getName());
-      putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
-      putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
-      putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
-      return session.putAllAttributes(flowFile, attributes);
+    private FlowFile savePartDetailsAsAttributes(final ProcessSession session, final Part part, final FlowFile flowFile, final int sequenceNumber, final int allPartsCount) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (String headerName : part.getHeaderNames()) {
+            final String headerValue = part.getHeader(headerName);
+            putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+        }
+        putAttribute(attributes, "http.multipart.size", part.getSize());
+        putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+        putAttribute(attributes, "http.multipart.name", part.getName());
+        putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+        putAttribute(attributes, "http.multipart.fragments.sequence.number", sequenceNumber + 1);
+        putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+        return session.putAllAttributes(flowFile, attributes);
     }
 
     private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
-        String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
-      FlowFile flowFile = null;
-      String holdUuid = null;
-      final AtomicBoolean hasMoreData = new AtomicBoolean(false);
-      final FlowFileUnpackager unpackager;
-      if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV3();
-      } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV2();
-      } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV1();
-      } else {
-          unpackager = null;
-      }
-
-      final Set<FlowFile> flowFileSet = new HashSet<>();
-
-      do {
-          final long startNanos = System.nanoTime();
-          final Map<String, String> attributes = new HashMap<>();
-          flowFile = session.create();
-          flowFile = session.write(flowFile, new OutputStreamCallback() {
-              @Override
-              public void process(final OutputStream rawOut) throws IOException {
-                  try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
-                      if (unpackager == null) {
-                          IOUtils.copy(in, bos);
-                          hasMoreData.set(false);
-                      } else {
-                          attributes.putAll(unpackager.unpackageFlowFile(in, bos));
-
-                          if (destinationIsLegacyNiFi) {
-                              if (attributes.containsKey("nf.file.name")) {
-                                  // for backward compatibility with old nifi...
-                                  attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
-                              }
-
-                              if (attributes.containsKey("nf.file.path")) {
-                                  attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
-                              }
-                          }
-
-                          hasMoreData.set(unpackager.hasMoreData());
-                      }
-                  }
-              }
-          });
-
-          final long transferNanos = System.nanoTime() - startNanos;
-          final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
-          // put metadata on flowfile
-          final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
-          if (StringUtils.isNotBlank(nameVal)) {
-              attributes.put(CoreAttributes.FILENAME.key(), nameVal);
-          }
-
-          String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
-          if (sourceSystemFlowFileIdentifier != null) {
-              sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
-
-              // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
-              // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
-              attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-          }
-
-          flowFile = session.putAllAttributes(flowFile, attributes);
-          flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
-          session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
-          flowFileSet.add(flowFile);
-
-          if (holdUuid == null) {
-              holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
-          }
-      } while (hasMoreData.get());
-      return flowFileSet;
+                                        String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+        FlowFile flowFile = null;
+        String holdUuid = null;
+        final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+        final FlowFileUnpackager unpackager;
+        if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV3();
+        } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV2();
+        } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV1();
+        } else {
+            unpackager = null;
+        }
+
+        final Set<FlowFile> flowFileSet = new HashSet<>();
+
+        do {
+            final long startNanos = System.nanoTime();
+            final Map<String, String> attributes = new HashMap<>();
+            flowFile = session.create();
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream rawOut) throws IOException {
+                    try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+                        if (unpackager == null) {
+                            IOUtils.copy(in, bos);
+                            hasMoreData.set(false);
+                        } else {
+                            attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+
+                            if (destinationIsLegacyNiFi) {
+                                if (attributes.containsKey("nf.file.name")) {
+                                    // for backward compatibility with old nifi...
+                                    attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+                                }
+
+                                if (attributes.containsKey("nf.file.path")) {
+                                    attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+                                }
+                            }
+
+                            hasMoreData.set(unpackager.hasMoreData());
+                        }
+                    }
+                }
+            });
+
+            final long transferNanos = System.nanoTime() - startNanos;
+            final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+            // put metadata on flowfile
+            final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+            if (StringUtils.isNotBlank(nameVal)) {
+                attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+            }
+
+            String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+            if (sourceSystemFlowFileIdentifier != null) {
+                sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+
+                // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+                // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+                attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+            }
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+            session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+            flowFileSet.add(flowFile);
+
+            if (holdUuid == null) {
+                holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+            }
+        } while (hasMoreData.get());
+        return flowFileSet;
     }
 
     protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
-        String foundSubject, FlowFile flowFile) {
-      Map<String, String> attributes = new HashMap<>();
-      addMatchingRequestHeaders(request, attributes);
-      flowFile = session.putAllAttributes(flowFile, attributes);
-      flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
-      flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
-      flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
-      return flowFile;
+                                                      String foundSubject, FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+        return flowFile;
     }
 
     private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
-      // put arbitrary headers on flow file
-      for (Enumeration<String> headerEnum = request.getHeaderNames();
-              headerEnum.hasMoreElements();) {
-          String headerName = headerEnum.nextElement();
-          if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
-              String headerValue = request.getHeader(headerName);
-              attributes.put(headerName, headerValue);
-          }
-      }
+        // put arbitrary headers on flow file
+        for (Enumeration<String> headerEnum = request.getHeaderNames();
+             headerEnum.hasMoreElements(); ) {
+            String headerName = headerEnum.nextElement();
+            if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+                String headerValue = request.getHeader(headerName);
+                attributes.put(headerName, headerValue);
+            }
+        }
     }
 
     protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
-        final ProcessSession session, String foundSubject, final boolean createHold,
-        final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
-      if (createHold) {
-          String uuid = UUID.randomUUID().toString();
-
-          if (flowFileMap.containsKey(uuid)) {
-              uuid = UUID.randomUUID().toString();
-          }
-
-          final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
-          FlowFileEntryTimeWrapper previousWrapper;
-          do {
-            previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
-              if (previousWrapper != null) {
-                  uuid = UUID.randomUUID().toString();
-              }
-          } while (previousWrapper != null);
-
-          response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-          final String ackUri =  "/" + basePath + "/holds/" + uuid;
-          response.addHeader(LOCATION_HEADER_NAME, ackUri);
-          response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
-          response.getOutputStream().write(ackUri.getBytes("UTF-8"));
-          if (logger.isDebugEnabled()) {
-              logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
-                      new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
-          }
-      } else {
-          response.setStatus(this.returnCode);
-          logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
-                  new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
-
-          session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
-          session.commit();
-      }
+                               final ProcessSession session, String foundSubject, final boolean createHold,
+                               final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+        if (createHold) {
+            String uuid = UUID.randomUUID().toString();
+
+            if (flowFileMap.containsKey(uuid)) {
+                uuid = UUID.randomUUID().toString();
+            }
+
+            final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
+            FlowFileEntryTimeWrapper previousWrapper;
+            do {
+                previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+                if (previousWrapper != null) {
+                    uuid = UUID.randomUUID().toString();
+                }
+            } while (previousWrapper != null);
+
+            response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+            final String ackUri = "/" + basePath + "/holds/" + uuid;
+            response.addHeader(LOCATION_HEADER_NAME, ackUri);
+            response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+            response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+            if (logger.isDebugEnabled()) {
+                logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+                    new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+            }
+        } else {
+            response.setStatus(this.returnCode);
+            logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
+                new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+
+            session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+            session.commit();
+        }
     }
 
     private void putAttribute(final Map<String, String> map, final String key, final Object value) {
-      if (value == null) {
-          return;
-      }
+        if (value == null) {
+            return;
+        }
 
-      putAttribute(map, key, value.toString());
-  }
+        putAttribute(map, key, value.toString());
+    }
 
     private void putAttribute(final Map<String, String> map, final String key, final String value) {
-      if (value == null) {
-          return;
-      }
+        if (value == null) {
+            return;
+        }
 
-      map.put(key, value);
-  }
+        map.put(key, value);
+    }
 }