You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/15 17:14:49 UTC

nifi git commit: NIFI-3469 multipart request support added to HandleHttpRequest - introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk - using java.io.tmpdir for such file writes - enhancing documentation -

Repository: nifi
Updated Branches:
  refs/heads/master ce25ae541 -> d5f071b2f


NIFI-3469 multipart request support added to HandleHttpRequest
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation
- documenting how to avoid premature HTTP response
- fix and UT for unsuccessful request registration

This closes #2991.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d5f071b2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d5f071b2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d5f071b2

Branch: refs/heads/master
Commit: d5f071b2fe5240d4face9b7c8582ba45fd668294
Parents: ce25ae5
Author: Endre Zoltan Kovacs <an...@protonmail.com>
Authored: Tue Sep 4 16:35:37 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 15 13:14:31 2018 -0400

----------------------------------------------------------------------
 .../processors/standard/HandleHttpRequest.java  | 401 ++++++++++++-------
 .../additionalDetails.html                      |  13 +
 .../standard/TestHandleHttpRequest.java         | 225 +++++++++++
 3 files changed, 497 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index ea27188..d03774e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -32,6 +32,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.http.HttpContextMap;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -52,12 +53,18 @@ import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
 import javax.servlet.AsyncContext;
 import javax.servlet.DispatcherType;
+import javax.servlet.MultipartConfigElement;
 import javax.servlet.ServletException;
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -109,10 +116,27 @@ import java.util.regex.Pattern;
             + "This value will not be populated unless the Processor is configured to use an SSLContext Service"),
     @WritesAttribute(attribute = "http.headers.XXX", description = "Each of the HTTP Headers that is received in the request will be added as an "
             + "attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value "
-            + "will be added to an attribute named \"http.headers.x-my-header\"")})
+            + "will be added to an attribute named \"http.headers.x-my-header\""),
+    @WritesAttribute(attribute = "http.headers.multipart.XXX", description = "Each of the HTTP Headers that is received in the mulipart request will be added as an "
+        + "attribute, prefixed with \"http.headers.multipart.\" For example, if the multipart request contains an HTTP Header named \"content-disposition\", then the value "
+        + "will be added to an attribute named \"http.headers.multipart.content-disposition\""),
+    @WritesAttribute(attribute = "http.multipart.size",
+        description = "For requests with Content-Type \"multipart/form-data\", the part's content size is recorded into this attribute"),
+    @WritesAttribute(attribute = "http.multipart.content.type",
+        description = "For requests with Content-Type \"multipart/form-data\", the part's content type is recorded into this attribute"),
+    @WritesAttribute(attribute = "http.multipart.name",
+        description = "For requests with Content-Type \"multipart/form-data\", the part's name is recorded into this attribute"),
+    @WritesAttribute(attribute = "http.multipart.filename",
+        description = "For requests with Content-Type \"multipart/form-data\", when the part contains an uploaded file, the name of the file is recorded into this attribute"),
+    @WritesAttribute(attribute = "http.multipart.fragments.sequence.number",
+        description = "For requests with Content-Type \"multipart/form-data\", the part's index is recorded into this attribute. The index starts with 1."),
+    @WritesAttribute(attribute = "http.multipart.fragments.total.number",
+      description = "For requests with Content-Type \"multipart/form-data\", the count of all parts is recorded into this attribute.")})
 @SeeAlso(value = {HandleHttpResponse.class})
 public class HandleHttpRequest extends AbstractProcessor {
 
+    private static final String MIME_TYPE__MULTIPART_FORM_DATA = "multipart/form-data";
+
     private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
 
     // Allowable values for client auth
@@ -229,7 +253,25 @@ public class HandleHttpRequest extends AbstractProcessor {
             .name("container-queue-size").displayName("Container Queue Size")
             .description("The size of the queue for Http Request Containers").required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("50").build();
-
+    public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder()
+            .name("multipart-request-max-size")
+            .displayName("Multipart Request Max Size")
+            .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
+                    + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+    public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("multipart-read-buffer-size")
+            .description("The threshold size, at which the contents of an incoming file would be written to disk. "
+                    + "Only applies for requests with Content-Type: multipart/form-data. "
+                    + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
+            .displayName("Multipart Read Buffer Size")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("512 KB")
+            .build();
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All content that is received is routed to the 'success' relationship")
@@ -254,6 +296,8 @@ public class HandleHttpRequest extends AbstractProcessor {
         descriptors.add(ADDITIONAL_METHODS);
         descriptors.add(CLIENT_AUTH);
         descriptors.add(CONTAINER_QUEUE_SIZE);
+        descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
+        descriptors.add(MULTIPART_READ_BUFFER_SIZE);
         propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
 
@@ -521,161 +565,234 @@ public class HandleHttpRequest extends AbstractProcessor {
 
         final long start = System.nanoTime();
         final HttpServletRequest request = container.getRequest();
-        FlowFile flowFile = session.create();
-        try (OutputStream flowFileOut = session.write(flowFile)) {
-            StreamUtils.copy(request.getInputStream(), flowFileOut);
-        } catch (final IOException e) {
-            // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
-            // bad requests, the connection to the client is not closed. In order to address also these cases, we try
-            // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
-            // processed and makes it aware that the connection can be closed.
-            getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
-                    new Object[]{request.getRemoteAddr(), e});
-            session.remove(flowFile);
 
-            try {
-                HttpServletResponse response = container.getResponse();
-                response.sendError(Status.BAD_REQUEST.getStatusCode());
-                response.flushBuffer();
-                container.getContext().complete();
-            } catch (final IOException ioe) {
-                getLogger().warn("Failed to send HTTP response to {} due to {}",
-                        new Object[]{request.getRemoteAddr(), ioe});
+        if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
+          final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
+          final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+          String tempDir = System.getProperty("java.io.tmpdir");
+          request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
+          try {
+            List<Part> parts = ImmutableList.copyOf(request.getParts());
+            int allPartsCount = parts.size();
+            final String contextIdentifier = UUID.randomUUID().toString();
+            for (int i = 0; i < allPartsCount; i++) {
+              Part part = parts.get(i);
+              FlowFile flowFile = session.create();
+              try (OutputStream flowFileOut = session.write(flowFile)) {
+                StreamUtils.copy(part.getInputStream(), flowFileOut);
+              } catch (IOException e) {
+                handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+                return;
+              }
+              flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount);
+              flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
+              if (i == 0) {
+                // each one of multipart comes from a single request, thus registering only once per loop.
+                boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+                if (!requestRegistrationSuccess)
+                  break;
+              }
+              forwardFlowFile(context, session, container, start, request, flowFile);
             }
+          } catch (IOException | ServletException | IllegalStateException e) {
+            handleFlowContentStreamingError(session, container, request, Optional.absent(), e);
             return;
+          }
+        } else {
+          FlowFile flowFile = session.create();
+          try (OutputStream flowFileOut = session.write(flowFile)) {
+            StreamUtils.copy(request.getInputStream(), flowFileOut);
+          } catch (final IOException e) {
+            handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+            return;
+          }
+          final String contextIdentifier = UUID.randomUUID().toString();
+          flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
+          boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+          if (requestRegistrationSuccess)
+            forwardFlowFile(context, session, container, start, request, flowFile);
         }
+    }
 
-        final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding();
-
-        final String contextIdentifier = UUID.randomUUID().toString();
-        final Map<String, String> attributes = new HashMap<>();
-        try {
-            putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
-            putAttribute(attributes, "mime.type", request.getContentType());
-            putAttribute(attributes, "http.servlet.path", request.getServletPath());
-            putAttribute(attributes, "http.context.path", request.getContextPath());
-            putAttribute(attributes, "http.method", request.getMethod());
-            putAttribute(attributes, "http.local.addr", request.getLocalAddr());
-            putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName());
-            final String queryString = request.getQueryString();
-            if (queryString != null) {
-                putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset));
-            }
-            putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost());
-            putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
-            putAttribute(attributes, "http.remote.user", request.getRemoteUser());
-            putAttribute(attributes, "http.protocol", request.getProtocol());
-            putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI());
-            putAttribute(attributes, "http.request.url", request.getRequestURL().toString());
-            putAttribute(attributes, "http.auth.type", request.getAuthType());
-
-            putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId());
-            final DispatcherType dispatcherType = request.getDispatcherType();
-            if (dispatcherType != null) {
-                putAttribute(attributes, "http.dispatcher.type", dispatcherType.name());
-            }
-            putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding());
-            putAttribute(attributes, "http.locale", request.getLocale());
-            putAttribute(attributes, "http.server.name", request.getServerName());
-            putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
-
-            final Enumeration<String> paramEnumeration = request.getParameterNames();
-            while (paramEnumeration.hasMoreElements()) {
-                final String paramName = paramEnumeration.nextElement();
-                final String value = request.getParameter(paramName);
-                attributes.put("http.param." + paramName, value);
-            }
-
-            final Cookie[] cookies = request.getCookies();
-            if (cookies != null) {
-                for (final Cookie cookie : cookies) {
-                    final String name = cookie.getName();
-                    final String cookiePrefix = "http.cookie." + name + ".";
-                    attributes.put(cookiePrefix + "value", cookie.getValue());
-                    attributes.put(cookiePrefix + "domain", cookie.getDomain());
-                    attributes.put(cookiePrefix + "path", cookie.getPath());
-                    attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge()));
-                    attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion()));
-                    attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure()));
-                }
-            }
-
-            if (queryString != null) {
-                final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
-                for (final String keyValueString : params) {
-                    final int indexOf = keyValueString.indexOf("=");
-                    if (indexOf < 0) {
-                        // no =, then it's just a key with no value
-                        attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), "");
-                    } else {
-                        final String key = keyValueString.substring(0, indexOf);
-                        final String value;
-
-                        if (indexOf == keyValueString.length() - 1) {
-                            value = "";
-                        } else {
-                            value = keyValueString.substring(indexOf + 1);
-                        }
-
-                        attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
-                    }
-                }
-            }
-        } catch (final UnsupportedEncodingException uee) {
-            throw new ProcessException("Invalid character encoding", uee);  // won't happen because charset has been validated
-        }
-
-        final Enumeration<String> headerNames = request.getHeaderNames();
-        while (headerNames.hasMoreElements()) {
-            final String headerName = headerNames.nextElement();
-            final String headerValue = request.getHeader(headerName);
-            putAttribute(attributes, "http.headers." + headerName, headerValue);
-        }
-
-        final Principal principal = request.getUserPrincipal();
-        if (principal != null) {
-            putAttribute(attributes, "http.principal.name", principal.getName());
-        }
+    private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+      final Map<String, String> attributes = new HashMap<>();
+      for (String headerName : part.getHeaderNames()) {
+        final String headerValue = part.getHeader(headerName);
+        putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+      }
+      putAttribute(attributes, "http.multipart.size", part.getSize());
+      putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+      putAttribute(attributes, "http.multipart.name", part.getName());
+      putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+      putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
+      putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+      return session.putAllAttributes(flowFile, attributes);
+    }
 
-        final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
-        final String subjectDn;
-        if (certs != null && certs.length > 0) {
-            final X509Certificate cert = certs[0];
-            subjectDn = cert.getSubjectDN().getName();
-            final String issuerDn = cert.getIssuerDN().getName();
+    private FlowFile saveRequestAttributes(final ProcessContext context, final ProcessSession session, HttpServletRequest request, FlowFile flowFile, String contextIdentifier) {
+      final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding();
+
+      final Map<String, String> attributes = new HashMap<>();
+      try {
+          putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
+          putAttribute(attributes, "mime.type", request.getContentType());
+          putAttribute(attributes, "http.servlet.path", request.getServletPath());
+          putAttribute(attributes, "http.context.path", request.getContextPath());
+          putAttribute(attributes, "http.method", request.getMethod());
+          putAttribute(attributes, "http.local.addr", request.getLocalAddr());
+          putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName());
+          final String queryString = request.getQueryString();
+          if (queryString != null) {
+              putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset));
+          }
+          putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost());
+          putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
+          putAttribute(attributes, "http.remote.user", request.getRemoteUser());
+          putAttribute(attributes, "http.protocol", request.getProtocol());
+          putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI());
+          putAttribute(attributes, "http.request.url", request.getRequestURL().toString());
+          putAttribute(attributes, "http.auth.type", request.getAuthType());
+
+          putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId());
+          final DispatcherType dispatcherType = request.getDispatcherType();
+          if (dispatcherType != null) {
+              putAttribute(attributes, "http.dispatcher.type", dispatcherType.name());
+          }
+          putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding());
+          putAttribute(attributes, "http.locale", request.getLocale());
+          putAttribute(attributes, "http.server.name", request.getServerName());
+          putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
+
+          final Enumeration<String> paramEnumeration = request.getParameterNames();
+          while (paramEnumeration.hasMoreElements()) {
+              final String paramName = paramEnumeration.nextElement();
+              final String value = request.getParameter(paramName);
+              attributes.put("http.param." + paramName, value);
+          }
+
+          final Cookie[] cookies = request.getCookies();
+          if (cookies != null) {
+              for (final Cookie cookie : cookies) {
+                  final String name = cookie.getName();
+                  final String cookiePrefix = "http.cookie." + name + ".";
+                  attributes.put(cookiePrefix + "value", cookie.getValue());
+                  attributes.put(cookiePrefix + "domain", cookie.getDomain());
+                  attributes.put(cookiePrefix + "path", cookie.getPath());
+                  attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge()));
+                  attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion()));
+                  attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure()));
+              }
+          }
+
+          if (queryString != null) {
+              final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
+              for (final String keyValueString : params) {
+                  final int indexOf = keyValueString.indexOf("=");
+                  if (indexOf < 0) {
+                      // no =, then it's just a key with no value
+                      attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), "");
+                  } else {
+                      final String key = keyValueString.substring(0, indexOf);
+                      final String value;
+
+                      if (indexOf == keyValueString.length() - 1) {
+                          value = "";
+                      } else {
+                          value = keyValueString.substring(indexOf + 1);
+                      }
+
+                      attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
+                  }
+              }
+          }
+      } catch (final UnsupportedEncodingException uee) {
+          throw new ProcessException("Invalid character encoding", uee);  // won't happen because charset has been validated
+      }
+
+      final Enumeration<String> headerNames = request.getHeaderNames();
+      while (headerNames.hasMoreElements()) {
+          final String headerName = headerNames.nextElement();
+          final String headerValue = request.getHeader(headerName);
+          putAttribute(attributes, "http.headers." + headerName, headerValue);
+      }
+
+      final Principal principal = request.getUserPrincipal();
+      if (principal != null) {
+          putAttribute(attributes, "http.principal.name", principal.getName());
+      }
+
+      final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+      final String subjectDn;
+      if (certs != null && certs.length > 0) {
+          final X509Certificate cert = certs[0];
+          subjectDn = cert.getSubjectDN().getName();
+          final String issuerDn = cert.getIssuerDN().getName();
+
+          putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
+          putAttribute(attributes, "http.issuer.dn", issuerDn);
+      } else {
+          subjectDn = null;
+      }
+
+      return session.putAllAttributes(flowFile, attributes);
+    }
 
-            putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
-            putAttribute(attributes, "http.issuer.dn", issuerDn);
-        } else {
-            subjectDn = null;
-        }
+    private void forwardFlowFile(final ProcessContext context, final ProcessSession session,
+        HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+      final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+      String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
+      session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()),
+          "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
+      session.transfer(flowFile, REL_SUCCESS);
+      getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+    }
 
-        flowFile = session.putAllAttributes(flowFile, attributes);
 
+    private boolean registerRequest(final ProcessContext context, final ProcessSession session,
+        HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
         final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
+        String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
         final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
+        if (registered)
+          return true;
 
-        if (!registered) {
-            getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
-                    new Object[]{request.getRemoteAddr()});
-
-            try {
-                container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
-                container.getResponse().flushBuffer();
-                container.getContext().complete();
-            } catch (final Exception e) {
-                getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
-                        new Object[]{request.getRemoteAddr(), e});
-            }
+        getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
+            new Object[]{request.getRemoteAddr()});
 
-            session.remove(flowFile);
-            return;
+        try {
+          container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
+          container.getResponse().flushBuffer();
+          container.getContext().complete();
+        } catch (final Exception e) {
+          getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
+              new Object[]{request.getRemoteAddr(), e});
         }
 
-        final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
-        session.transfer(flowFile, REL_SUCCESS);
-        getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+        session.remove(flowFile);
+        return false;
+    }
+
+
+    protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container,
+        final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) {
+      // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
+      // bad requests, the connection to the client is not closed. In order to address also these cases, we try
+      // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
+      // processed and makes it aware that the connection can be closed.
+      getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
+              new Object[]{request.getRemoteAddr(), e});
+      if (flowFile.isPresent())
+        session.remove(flowFile.get());
+
+      try {
+          HttpServletResponse response = container.getResponse();
+          response.sendError(Status.BAD_REQUEST.getStatusCode());
+          response.flushBuffer();
+          container.getContext().complete();
+      } catch (final IOException ioe) {
+          getLogger().warn("Failed to send HTTP response to {} due to {}",
+                  new Object[]{request.getRemoteAddr(), ioe});
+      }
     }
 
     private void putAttribute(final Map<String, String> map, final String key, final Object value) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
index 70adb85..6510e12 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
@@ -39,5 +39,18 @@
             The HandleHttpRequest Processor provides several Properties to configure which methods are supported, the paths that are
             supported, and SSL configuration. 
         </p>
+
+        <p>
+            To handle requests with Content-Type: <i>multipart/form-data</i> containing multiple parts, additional attention needs to be paid.
+            Each <i>part</i> generates a FlowFile of its own. To each these FlowFiles, some special attributes are written:
+            <ul>
+                <li>http.context.identifier</li>
+                <li>http.multipart.fragments.sequence.number</li>
+                <li>http.multipart.fragments.total.number</li>
+            </ul>
+            These attributes could be used to implement a gating mechanism for HandleHttpResponse processor to wait for the processing of FlowFiles
+            with sequence number <b>http.multipart.fragments.sequence.number</b> until up to <b>http.multipart.fragments.total.number</b> of flow files are processed,
+             belonging to the same <b>http.context.identifier</b>, which is unique to the request.
+        </p>
     </body>
 </html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
index 8c98a1c..6352291 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
@@ -18,15 +18,19 @@ package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -36,6 +40,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@@ -48,6 +53,18 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.api.client.util.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
 public class TestHandleHttpRequest {
 
     private static Map<String, String> getTruststoreProperties() {
@@ -141,6 +158,214 @@ public class TestHandleHttpRequest {
 
 
     @Test(timeout=10000)
+    public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+      final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+      runner.setProperty(HandleHttpRequest.PORT, "0");
+
+      final MockHttpContextMap contextMap = new MockHttpContextMap();
+      runner.addControllerService("http-context-map", contextMap);
+      runner.enableControllerService(contextMap);
+      runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+      // trigger processor to stop but not shutdown.
+      runner.run(1, false);
+      try {
+        final Thread httpThread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+
+              final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+              MultipartBody multipartBody = new MultipartBody.Builder()
+                .setType(MultipartBody.FORM)
+                .addFormDataPart("p1", "v1")
+                .addFormDataPart("p2", "v2")
+                .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+                .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+                .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+                .build();
+
+              Request request = new Request.Builder()
+              .url(String.format("http://localhost:%s/my/path", port))
+              .post(multipartBody).build();
+
+              OkHttpClient client =
+                  new OkHttpClient.Builder()
+                    .readTimeout(3000, TimeUnit.MILLISECONDS)
+                    .writeTimeout(3000, TimeUnit.MILLISECONDS)
+                  .build();
+
+              try (Response response = client.newCall(request).execute()) {
+                Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
+              }
+            } catch (final Throwable t) {
+              t.printStackTrace();
+              Assert.fail(t.toString());
+            }
+          }
+        });
+        httpThread.start();
+
+        while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+          // process the request.
+          runner.run(1, false, false);
+        }
+
+        runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
+        assertEquals(1, contextMap.size());
+
+        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS);
+
+        // Part fragments are not processed in the order we submitted them.
+        // We cannot rely on the order we sent them in.
+        MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+        String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+        mff.assertAttributeEquals("http.multipart.name", "p1");
+        mff.assertAttributeExists("http.multipart.size");
+        mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
+        mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+        mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+        mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+        // each part generates a corresponding flow file - yet all parts are coming from the same request,
+        mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+        mff.assertAttributeEquals("http.multipart.name", "p2");
+        mff.assertAttributeExists("http.multipart.size");
+        mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+        mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+        mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+        mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+        mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+        mff.assertAttributeEquals("http.multipart.name", "file1");
+        mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+        mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
+        mff.assertAttributeExists("http.multipart.size");
+        mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+        mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+        mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+        mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+        mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+        mff.assertAttributeEquals("http.multipart.name", "file2");
+        mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
+        mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
+        mff.assertAttributeExists("http.multipart.size");
+        mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+        mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+        mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+        mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+        mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+        mff.assertAttributeEquals("http.multipart.name", "file3");
+        mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
+        mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
+        mff.assertAttributeExists("http.multipart.size");
+        mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+        mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+        mff.assertAttributeExists("http.headers.multipart.content-disposition");
+      } finally {
+        // shut down the server
+        runner.run(1, true);
+      }
+    }
+
+    @Test(timeout=10000)
+    public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+      final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+      runner.setProperty(HandleHttpRequest.PORT, "0");
+
+      final MockHttpContextMap contextMap = new MockHttpContextMap();
+      contextMap.setRegisterSuccessfully(false);
+      runner.addControllerService("http-context-map", contextMap);
+      runner.enableControllerService(contextMap);
+      runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+      // trigger processor to stop but not shutdown.
+      runner.run(1, false);
+      try {
+        AtomicInteger responseCode = new AtomicInteger(0);
+        final Thread httpThread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+
+              final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+              MultipartBody multipartBody = new MultipartBody.Builder()
+                .setType(MultipartBody.FORM)
+                .addFormDataPart("p1", "v1")
+                .addFormDataPart("p2", "v2")
+                .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+                .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+                .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+                .build();
+
+              Request request = new Request.Builder()
+              .url(String.format("http://localhost:%s/my/path", port))
+              .post(multipartBody).build();
+
+              OkHttpClient client =
+                  new OkHttpClient.Builder()
+                    .readTimeout(3000, TimeUnit.MILLISECONDS)
+                    .writeTimeout(3000, TimeUnit.MILLISECONDS)
+                  .build();
+
+              try (Response response = client.newCall(request).execute()) {
+                responseCode.set(response.code());
+              }
+            } catch (final Throwable t) {
+              t.printStackTrace();
+              Assert.fail(t.toString());
+            }
+          }
+        });
+        httpThread.start();
+
+        while (responseCode.get() == 0) {
+          // process the request.
+          runner.run(1, false, false);
+        }
+
+        runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
+        assertEquals(0, contextMap.size());
+        Assert.assertEquals(503, responseCode.get());
+      } finally {
+        // shut down the server
+        runner.run(1, true);
+      }
+    }
+
+    private byte[] generateRandomBinaryData(int i) {
+      byte[] bytes = new byte[100];
+      new Random().nextBytes(bytes);
+      return bytes;
+    }
+
+
+    private File createTextFile(String fileName, String... lines) throws IOException {
+      File file = new File(fileName);
+      file.deleteOnExit();
+      for (String string : lines) {
+        Files.append(string, file, Charsets.UTF_8);
+      }
+      return file;
+    }
+
+
+    protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
+      Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+      Assert.assertTrue(optional.isPresent());
+      return optional.get();
+    }
+
+
+    @Test(timeout=10000)
     public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
         runner.setProperty(HandleHttpRequest.PORT, "0");