You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/15 17:14:49 UTC
nifi git commit: NIFI-3469 multipart request support added to
HandleHttpRequest - introducing a in-memory-file-size-threashold,
above which the incoming file is written to local disk - using java.io.tmpdir
for such file writes - enhancing documentation -
Repository: nifi
Updated Branches:
refs/heads/master ce25ae541 -> d5f071b2f
NIFI-3469 multipart request support added to HandleHttpRequest
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation
- documenting how to avoid premature HTTP response
- fix and UT for unsuccessful request registration
This closes #2991.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d5f071b2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d5f071b2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d5f071b2
Branch: refs/heads/master
Commit: d5f071b2fe5240d4face9b7c8582ba45fd668294
Parents: ce25ae5
Author: Endre Zoltan Kovacs <an...@protonmail.com>
Authored: Tue Sep 4 16:35:37 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 15 13:14:31 2018 -0400
----------------------------------------------------------------------
.../processors/standard/HandleHttpRequest.java | 401 ++++++++++++-------
.../additionalDetails.html | 13 +
.../standard/TestHandleHttpRequest.java | 225 +++++++++++
3 files changed, 497 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index ea27188..d03774e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -32,6 +32,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -52,12 +53,18 @@ import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
+import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.OutputStream;
@@ -109,10 +116,27 @@ import java.util.regex.Pattern;
+ "This value will not be populated unless the Processor is configured to use an SSLContext Service"),
@WritesAttribute(attribute = "http.headers.XXX", description = "Each of the HTTP Headers that is received in the request will be added as an "
+ "attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value "
- + "will be added to an attribute named \"http.headers.x-my-header\"")})
+ + "will be added to an attribute named \"http.headers.x-my-header\""),
+ @WritesAttribute(attribute = "http.headers.multipart.XXX", description = "Each of the HTTP Headers that is received in the mulipart request will be added as an "
+ + "attribute, prefixed with \"http.headers.multipart.\" For example, if the multipart request contains an HTTP Header named \"content-disposition\", then the value "
+ + "will be added to an attribute named \"http.headers.multipart.content-disposition\""),
+ @WritesAttribute(attribute = "http.multipart.size",
+ description = "For requests with Content-Type \"multipart/form-data\", the part's content size is recorded into this attribute"),
+ @WritesAttribute(attribute = "http.multipart.content.type",
+ description = "For requests with Content-Type \"multipart/form-data\", the part's content type is recorded into this attribute"),
+ @WritesAttribute(attribute = "http.multipart.name",
+ description = "For requests with Content-Type \"multipart/form-data\", the part's name is recorded into this attribute"),
+ @WritesAttribute(attribute = "http.multipart.filename",
+ description = "For requests with Content-Type \"multipart/form-data\", when the part contains an uploaded file, the name of the file is recorded into this attribute"),
+ @WritesAttribute(attribute = "http.multipart.fragments.sequence.number",
+ description = "For requests with Content-Type \"multipart/form-data\", the part's index is recorded into this attribute. The index starts with 1."),
+ @WritesAttribute(attribute = "http.multipart.fragments.total.number",
+ description = "For requests with Content-Type \"multipart/form-data\", the count of all parts is recorded into this attribute.")})
@SeeAlso(value = {HandleHttpResponse.class})
public class HandleHttpRequest extends AbstractProcessor {
+ private static final String MIME_TYPE__MULTIPART_FORM_DATA = "multipart/form-data";
+
private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
// Allowable values for client auth
@@ -229,7 +253,25 @@ public class HandleHttpRequest extends AbstractProcessor {
.name("container-queue-size").displayName("Container Queue Size")
.description("The size of the queue for Http Request Containers").required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("50").build();
-
+ public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("multipart-request-max-size")
+ .displayName("Multipart Request Max Size")
+ .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
+ + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
+ public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("multipart-read-buffer-size")
+ .description("The threshold size, at which the contents of an incoming file would be written to disk. "
+ + "Only applies for requests with Content-Type: multipart/form-data. "
+ + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
+ .displayName("Multipart Read Buffer Size")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("512 KB")
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All content that is received is routed to the 'success' relationship")
@@ -254,6 +296,8 @@ public class HandleHttpRequest extends AbstractProcessor {
descriptors.add(ADDITIONAL_METHODS);
descriptors.add(CLIENT_AUTH);
descriptors.add(CONTAINER_QUEUE_SIZE);
+ descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
+ descriptors.add(MULTIPART_READ_BUFFER_SIZE);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@@ -521,161 +565,234 @@ public class HandleHttpRequest extends AbstractProcessor {
final long start = System.nanoTime();
final HttpServletRequest request = container.getRequest();
- FlowFile flowFile = session.create();
- try (OutputStream flowFileOut = session.write(flowFile)) {
- StreamUtils.copy(request.getInputStream(), flowFileOut);
- } catch (final IOException e) {
- // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
- // bad requests, the connection to the client is not closed. In order to address also these cases, we try
- // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
- // processed and makes it aware that the connection can be closed.
- getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
- new Object[]{request.getRemoteAddr(), e});
- session.remove(flowFile);
- try {
- HttpServletResponse response = container.getResponse();
- response.sendError(Status.BAD_REQUEST.getStatusCode());
- response.flushBuffer();
- container.getContext().complete();
- } catch (final IOException ioe) {
- getLogger().warn("Failed to send HTTP response to {} due to {}",
- new Object[]{request.getRemoteAddr(), ioe});
+ if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
+ final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
+ final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ String tempDir = System.getProperty("java.io.tmpdir");
+ request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
+ try {
+ List<Part> parts = ImmutableList.copyOf(request.getParts());
+ int allPartsCount = parts.size();
+ final String contextIdentifier = UUID.randomUUID().toString();
+ for (int i = 0; i < allPartsCount; i++) {
+ Part part = parts.get(i);
+ FlowFile flowFile = session.create();
+ try (OutputStream flowFileOut = session.write(flowFile)) {
+ StreamUtils.copy(part.getInputStream(), flowFileOut);
+ } catch (IOException e) {
+ handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+ return;
+ }
+ flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount);
+ flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
+ if (i == 0) {
+ // each one of multipart comes from a single request, thus registering only once per loop.
+ boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+ if (!requestRegistrationSuccess)
+ break;
+ }
+ forwardFlowFile(context, session, container, start, request, flowFile);
}
+ } catch (IOException | ServletException | IllegalStateException e) {
+ handleFlowContentStreamingError(session, container, request, Optional.absent(), e);
return;
+ }
+ } else {
+ FlowFile flowFile = session.create();
+ try (OutputStream flowFileOut = session.write(flowFile)) {
+ StreamUtils.copy(request.getInputStream(), flowFileOut);
+ } catch (final IOException e) {
+ handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+ return;
+ }
+ final String contextIdentifier = UUID.randomUUID().toString();
+ flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
+ boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+ if (requestRegistrationSuccess)
+ forwardFlowFile(context, session, container, start, request, flowFile);
}
+ }
- final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding();
-
- final String contextIdentifier = UUID.randomUUID().toString();
- final Map<String, String> attributes = new HashMap<>();
- try {
- putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
- putAttribute(attributes, "mime.type", request.getContentType());
- putAttribute(attributes, "http.servlet.path", request.getServletPath());
- putAttribute(attributes, "http.context.path", request.getContextPath());
- putAttribute(attributes, "http.method", request.getMethod());
- putAttribute(attributes, "http.local.addr", request.getLocalAddr());
- putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName());
- final String queryString = request.getQueryString();
- if (queryString != null) {
- putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset));
- }
- putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost());
- putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
- putAttribute(attributes, "http.remote.user", request.getRemoteUser());
- putAttribute(attributes, "http.protocol", request.getProtocol());
- putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI());
- putAttribute(attributes, "http.request.url", request.getRequestURL().toString());
- putAttribute(attributes, "http.auth.type", request.getAuthType());
-
- putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId());
- final DispatcherType dispatcherType = request.getDispatcherType();
- if (dispatcherType != null) {
- putAttribute(attributes, "http.dispatcher.type", dispatcherType.name());
- }
- putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding());
- putAttribute(attributes, "http.locale", request.getLocale());
- putAttribute(attributes, "http.server.name", request.getServerName());
- putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
-
- final Enumeration<String> paramEnumeration = request.getParameterNames();
- while (paramEnumeration.hasMoreElements()) {
- final String paramName = paramEnumeration.nextElement();
- final String value = request.getParameter(paramName);
- attributes.put("http.param." + paramName, value);
- }
-
- final Cookie[] cookies = request.getCookies();
- if (cookies != null) {
- for (final Cookie cookie : cookies) {
- final String name = cookie.getName();
- final String cookiePrefix = "http.cookie." + name + ".";
- attributes.put(cookiePrefix + "value", cookie.getValue());
- attributes.put(cookiePrefix + "domain", cookie.getDomain());
- attributes.put(cookiePrefix + "path", cookie.getPath());
- attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge()));
- attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion()));
- attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure()));
- }
- }
-
- if (queryString != null) {
- final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
- for (final String keyValueString : params) {
- final int indexOf = keyValueString.indexOf("=");
- if (indexOf < 0) {
- // no =, then it's just a key with no value
- attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), "");
- } else {
- final String key = keyValueString.substring(0, indexOf);
- final String value;
-
- if (indexOf == keyValueString.length() - 1) {
- value = "";
- } else {
- value = keyValueString.substring(indexOf + 1);
- }
-
- attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
- }
- }
- }
- } catch (final UnsupportedEncodingException uee) {
- throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated
- }
-
- final Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- final String headerName = headerNames.nextElement();
- final String headerValue = request.getHeader(headerName);
- putAttribute(attributes, "http.headers." + headerName, headerValue);
- }
-
- final Principal principal = request.getUserPrincipal();
- if (principal != null) {
- putAttribute(attributes, "http.principal.name", principal.getName());
- }
+ private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+ final Map<String, String> attributes = new HashMap<>();
+ for (String headerName : part.getHeaderNames()) {
+ final String headerValue = part.getHeader(headerName);
+ putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+ }
+ putAttribute(attributes, "http.multipart.size", part.getSize());
+ putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+ putAttribute(attributes, "http.multipart.name", part.getName());
+ putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+ putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
+ putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+ return session.putAllAttributes(flowFile, attributes);
+ }
- final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
- final String subjectDn;
- if (certs != null && certs.length > 0) {
- final X509Certificate cert = certs[0];
- subjectDn = cert.getSubjectDN().getName();
- final String issuerDn = cert.getIssuerDN().getName();
+ private FlowFile saveRequestAttributes(final ProcessContext context, final ProcessSession session, HttpServletRequest request, FlowFile flowFile, String contextIdentifier) {
+ final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding();
+
+ final Map<String, String> attributes = new HashMap<>();
+ try {
+ putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
+ putAttribute(attributes, "mime.type", request.getContentType());
+ putAttribute(attributes, "http.servlet.path", request.getServletPath());
+ putAttribute(attributes, "http.context.path", request.getContextPath());
+ putAttribute(attributes, "http.method", request.getMethod());
+ putAttribute(attributes, "http.local.addr", request.getLocalAddr());
+ putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName());
+ final String queryString = request.getQueryString();
+ if (queryString != null) {
+ putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset));
+ }
+ putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost());
+ putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
+ putAttribute(attributes, "http.remote.user", request.getRemoteUser());
+ putAttribute(attributes, "http.protocol", request.getProtocol());
+ putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI());
+ putAttribute(attributes, "http.request.url", request.getRequestURL().toString());
+ putAttribute(attributes, "http.auth.type", request.getAuthType());
+
+ putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId());
+ final DispatcherType dispatcherType = request.getDispatcherType();
+ if (dispatcherType != null) {
+ putAttribute(attributes, "http.dispatcher.type", dispatcherType.name());
+ }
+ putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding());
+ putAttribute(attributes, "http.locale", request.getLocale());
+ putAttribute(attributes, "http.server.name", request.getServerName());
+ putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
+
+ final Enumeration<String> paramEnumeration = request.getParameterNames();
+ while (paramEnumeration.hasMoreElements()) {
+ final String paramName = paramEnumeration.nextElement();
+ final String value = request.getParameter(paramName);
+ attributes.put("http.param." + paramName, value);
+ }
+
+ final Cookie[] cookies = request.getCookies();
+ if (cookies != null) {
+ for (final Cookie cookie : cookies) {
+ final String name = cookie.getName();
+ final String cookiePrefix = "http.cookie." + name + ".";
+ attributes.put(cookiePrefix + "value", cookie.getValue());
+ attributes.put(cookiePrefix + "domain", cookie.getDomain());
+ attributes.put(cookiePrefix + "path", cookie.getPath());
+ attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge()));
+ attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion()));
+ attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure()));
+ }
+ }
+
+ if (queryString != null) {
+ final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
+ for (final String keyValueString : params) {
+ final int indexOf = keyValueString.indexOf("=");
+ if (indexOf < 0) {
+ // no =, then it's just a key with no value
+ attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), "");
+ } else {
+ final String key = keyValueString.substring(0, indexOf);
+ final String value;
+
+ if (indexOf == keyValueString.length() - 1) {
+ value = "";
+ } else {
+ value = keyValueString.substring(indexOf + 1);
+ }
+
+ attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
+ }
+ }
+ }
+ } catch (final UnsupportedEncodingException uee) {
+ throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated
+ }
+
+ final Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ final String headerName = headerNames.nextElement();
+ final String headerValue = request.getHeader(headerName);
+ putAttribute(attributes, "http.headers." + headerName, headerValue);
+ }
+
+ final Principal principal = request.getUserPrincipal();
+ if (principal != null) {
+ putAttribute(attributes, "http.principal.name", principal.getName());
+ }
+
+ final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+ final String subjectDn;
+ if (certs != null && certs.length > 0) {
+ final X509Certificate cert = certs[0];
+ subjectDn = cert.getSubjectDN().getName();
+ final String issuerDn = cert.getIssuerDN().getName();
+
+ putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
+ putAttribute(attributes, "http.issuer.dn", issuerDn);
+ } else {
+ subjectDn = null;
+ }
+
+ return session.putAllAttributes(flowFile, attributes);
+ }
- putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
- putAttribute(attributes, "http.issuer.dn", issuerDn);
- } else {
- subjectDn = null;
- }
+ private void forwardFlowFile(final ProcessContext context, final ProcessSession session,
+ HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+ final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
+ session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()),
+ "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+ }
- flowFile = session.putAllAttributes(flowFile, attributes);
+ private boolean registerRequest(final ProcessContext context, final ProcessSession session,
+ HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
+ String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
+ if (registered)
+ return true;
- if (!registered) {
- getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
- new Object[]{request.getRemoteAddr()});
-
- try {
- container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
- container.getResponse().flushBuffer();
- container.getContext().complete();
- } catch (final Exception e) {
- getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
- new Object[]{request.getRemoteAddr(), e});
- }
+ getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
+ new Object[]{request.getRemoteAddr()});
- session.remove(flowFile);
- return;
+ try {
+ container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
+ container.getResponse().flushBuffer();
+ container.getContext().complete();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
+ new Object[]{request.getRemoteAddr(), e});
}
- final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+ session.remove(flowFile);
+ return false;
+ }
+
+
+ protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container,
+ final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) {
+ // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
+ // bad requests, the connection to the client is not closed. In order to address also these cases, we try
+ // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
+ // processed and makes it aware that the connection can be closed.
+ getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
+ new Object[]{request.getRemoteAddr(), e});
+ if (flowFile.isPresent())
+ session.remove(flowFile.get());
+
+ try {
+ HttpServletResponse response = container.getResponse();
+ response.sendError(Status.BAD_REQUEST.getStatusCode());
+ response.flushBuffer();
+ container.getContext().complete();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to send HTTP response to {} due to {}",
+ new Object[]{request.getRemoteAddr(), ioe});
+ }
}
private void putAttribute(final Map<String, String> map, final String key, final Object value) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
index 70adb85..6510e12 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html
@@ -39,5 +39,18 @@
The HandleHttpRequest Processor provides several Properties to configure which methods are supported, the paths that are
supported, and SSL configuration.
</p>
+
+ <p>
+ To handle requests with Content-Type: <i>multipart/form-data</i> containing multiple parts, additional attention needs to be paid.
+ Each <i>part</i> generates a FlowFile of its own. To each these FlowFiles, some special attributes are written:
+ <ul>
+ <li>http.context.identifier</li>
+ <li>http.multipart.fragments.sequence.number</li>
+ <li>http.multipart.fragments.total.number</li>
+ </ul>
+ These attributes could be used to implement a gating mechanism for HandleHttpResponse processor to wait for the processing of FlowFiles
+ with sequence number <b>http.multipart.fragments.sequence.number</b> until up to <b>http.multipart.fragments.total.number</b> of flow files are processed,
+ belonging to the same <b>http.context.identifier</b>, which is unique to the request.
+ </p>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
index 8c98a1c..6352291 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
@@ -18,15 +18,19 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
+import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
@@ -36,6 +40,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@@ -48,6 +53,18 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
+import com.google.api.client.util.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
public class TestHandleHttpRequest {
private static Map<String, String> getTruststoreProperties() {
@@ -141,6 +158,214 @@ public class TestHandleHttpRequest {
@Test(timeout=10000)
+ public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+ MultipartBody multipartBody = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ .build();
+
+ Request request = new Request.Builder()
+ .url(String.format("http://localhost:%s/my/path", port))
+ .post(multipartBody).build();
+
+ OkHttpClient client =
+ new OkHttpClient.Builder()
+ .readTimeout(3000, TimeUnit.MILLISECONDS)
+ .writeTimeout(3000, TimeUnit.MILLISECONDS)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
+ }
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
+ assertEquals(1, contextMap.size());
+
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS);
+
+ // Part fragments are not processed in the order we submitted them.
+ // We cannot rely on the order we sent them in.
+ MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+ String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+ mff.assertAttributeEquals("http.multipart.name", "p1");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+ // each part generates a corresponding flow file - yet all parts are coming from the same request,
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "p2");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file1");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file2");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+ mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId);
+ mff.assertAttributeEquals("http.multipart.name", "file3");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+ runner.setProperty(HandleHttpRequest.PORT, "0");
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ contextMap.setRegisterSuccessfully(false);
+ runner.addControllerService("http-context-map", contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+
+ // trigger processor to stop but not shutdown.
+ runner.run(1, false);
+ try {
+ AtomicInteger responseCode = new AtomicInteger(0);
+ final Thread httpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
+ final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+
+ MultipartBody multipartBody = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ .build();
+
+ Request request = new Request.Builder()
+ .url(String.format("http://localhost:%s/my/path", port))
+ .post(multipartBody).build();
+
+ OkHttpClient client =
+ new OkHttpClient.Builder()
+ .readTimeout(3000, TimeUnit.MILLISECONDS)
+ .writeTimeout(3000, TimeUnit.MILLISECONDS)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ responseCode.set(response.code());
+ }
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ }
+ });
+ httpThread.start();
+
+ while (responseCode.get() == 0) {
+ // process the request.
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
+ assertEquals(0, contextMap.size());
+ Assert.assertEquals(503, responseCode.get());
+ } finally {
+ // shut down the server
+ runner.run(1, true);
+ }
+ }
+
+ private byte[] generateRandomBinaryData(int i) {
+ byte[] bytes = new byte[100];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+
+ private File createTextFile(String fileName, String... lines) throws IOException {
+ File file = new File(fileName);
+ file.deleteOnExit();
+ for (String string : lines) {
+ Files.append(string, file, Charsets.UTF_8);
+ }
+ return file;
+ }
+
+
+ protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
+ Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+ Assert.assertTrue(optional.isPresent());
+ return optional.get();
+ }
+
+
+ @Test(timeout=10000)
public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");