You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/11 19:42:00 UTC
[1/2] nifi git commit: NIFI-1490: multipart/form-data support for
ListenHTTP processor - introducing a in-memory-file-size-threashold,
above which the incoming file is written to local disk - using java.io.tmpdir
for such file writes - enhancing document
Repository: nifi
Updated Branches:
refs/heads/master 8398ea77b -> 5aa426358
NIFI-1490: multipart/form-data support for ListenHTTP processor
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c81a1351
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c81a1351
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c81a1351
Branch: refs/heads/master
Commit: c81a135161c7af106117dcc616706798c4564669
Parents: 8398ea7
Author: Endre Zoltan Kovacs <an...@protonmail.com>
Authored: Thu Sep 6 17:33:33 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 15:41:19 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenHTTP.java | 29 +-
.../standard/servlets/ListenHTTPServlet.java | 329 ++++++++++++-------
.../processors/standard/TestListenHTTP.java | 174 ++++++++--
3 files changed, 392 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 209e6d1..39c91db 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,6 +134,25 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.defaultValue(String.valueOf(HttpServletResponse.SC_OK))
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+ .name("max-request-size")
+ .displayName("Max Request Size")
+ .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
+ + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
+ public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder()
+ .name("in-memory-file-size-threshold")
+ .displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+ + "Only applies for requests with Content-Type: multipart/form-data. "
+ + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
+ .description("The threshold value for writing a file to disk.")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("512 KB")
+ .build();
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -145,6 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
+ public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize";
+ public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold";
private volatile Server server = null;
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -166,6 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(MAX_UNCONFIRMED_TIME);
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
descriptors.add(RETURN_CODE);
+ descriptors.add(MAX_REQUEST_SIZE);
+ descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
this.properties = Collections.unmodifiableList(descriptors);
}
@@ -214,6 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
+ long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
+ int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
throttlerRef.set(streamThrottler);
final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null;
@@ -295,7 +320,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
- contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold);
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 85de6f8..8803f0b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -20,10 +20,12 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.security.cert.X509Certificate;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -34,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
+
+import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.FlowFile;
@@ -54,10 +60,15 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.ListenHTTP;
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFileUnpackager;
import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
@Path("")
@@ -94,6 +105,8 @@ public class ListenHTTPServlet extends HttpServlet {
private StreamThrottler streamThrottler;
private String basePath;
private int returnCode;
+ private long maxRequestSize;
+ private int inMemoryFileSizeThreshold;
@SuppressWarnings("unchecked")
@Override
@@ -108,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet {
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
+ this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
+ this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
}
@Override
@@ -133,8 +148,6 @@ public class ListenHTTPServlet extends HttpServlet {
} while (sessionFactory == null);
final ProcessSession session = sessionFactory.createSession();
- FlowFile flowFile = null;
- String holdUuid = null;
String foundSubject = null;
try {
final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
@@ -191,134 +204,212 @@ public class ListenHTTPServlet extends HttpServlet {
logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
}
- final AtomicBoolean hasMoreData = new AtomicBoolean(false);
- final FlowFileUnpackager unpackager;
- if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV3();
- } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV2();
- } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV1();
+ Set<FlowFile> flowFileSet = null;
+ if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
+ flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
} else {
- unpackager = null;
+ flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
}
+ proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
+ } catch (final Throwable t) {
+ handleException(request, response, session, foundSubject, t);
+ }
+ }
- final Set<FlowFile> flowFileSet = new HashSet<>();
-
- do {
- final long startNanos = System.nanoTime();
- final Map<String, String> attributes = new HashMap<>();
- flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
- if (unpackager == null) {
- IOUtils.copy(in, bos);
- hasMoreData.set(false);
- } else {
- attributes.putAll(unpackager.unpackageFlowFile(in, bos));
-
- if (destinationIsLegacyNiFi) {
- if (attributes.containsKey("nf.file.name")) {
- // for backward compatibility with old nifi...
- attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
- }
-
- if (attributes.containsKey("nf.file.path")) {
- attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
- }
- }
-
- hasMoreData.set(unpackager.hasMoreData());
- }
- }
- }
- });
-
- final long transferNanos = System.nanoTime() - startNanos;
- final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+ private void handleException(final HttpServletRequest request, final HttpServletResponse response,
+ final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+ session.rollback();
+ logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+ }
- // put metadata on flowfile
- final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
- if (StringUtils.isNotBlank(nameVal)) {
- attributes.put(CoreAttributes.FILENAME.key(), nameVal);
- }
+ private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject,
+ boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException {
+ Set<FlowFile> flowFileSet = new HashSet<>();
+ String tempDir = System.getProperty("java.io.tmpdir");
+ request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold));
+ List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+ for (int i = 0; i < requestParts.size(); i++) {
+ Part part = requestParts.get(i);
+ FlowFile flowFile = session.create();
+ try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
+ StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+ }
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+ flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
+ flowFileSet.add(flowFile);
+ }
+ return flowFileSet;
+ }
- // put arbitrary headers on flow file
- for (Enumeration<String> headerEnum = request.getHeaderNames();
- headerEnum.hasMoreElements();) {
- String headerName = headerEnum.nextElement();
- if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
- String headerValue = request.getHeader(headerName);
- attributes.put(headerName, headerValue);
- }
- }
+ private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+ Map<String, String> attributes = new HashMap<>();
+ for (String headerName : part.getHeaderNames()) {
+ final String headerValue = part.getHeader(headerName);
+ putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+ }
+ putAttribute(attributes, "http.multipart.size", part.getSize());
+ putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+ putAttribute(attributes, "http.multipart.name", part.getName());
+ putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+ putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
+ putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+ return session.putAllAttributes(flowFile, attributes);
+ }
- String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
- if (sourceSystemFlowFileIdentifier != null) {
- sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+ private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
+ String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+ FlowFile flowFile = null;
+ String holdUuid = null;
+ final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+ final FlowFileUnpackager unpackager;
+ if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV3();
+ } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV2();
+ } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV1();
+ } else {
+ unpackager = null;
+ }
+
+ final Set<FlowFile> flowFileSet = new HashSet<>();
+
+ do {
+ final long startNanos = System.nanoTime();
+ final Map<String, String> attributes = new HashMap<>();
+ flowFile = session.create();
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+ if (unpackager == null) {
+ IOUtils.copy(in, bos);
+ hasMoreData.set(false);
+ } else {
+ attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+
+ if (destinationIsLegacyNiFi) {
+ if (attributes.containsKey("nf.file.name")) {
+ // for backward compatibility with old nifi...
+ attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+ }
+
+ if (attributes.containsKey("nf.file.path")) {
+ attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+ }
+ }
+
+ hasMoreData.set(unpackager.hasMoreData());
+ }
+ }
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ // put metadata on flowfile
+ final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+ if (StringUtils.isNotBlank(nameVal)) {
+ attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+ }
+
+ String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+ if (sourceSystemFlowFileIdentifier != null) {
+ sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+
+ // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+ // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+ attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+ }
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+ session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+ flowFileSet.add(flowFile);
+
+ if (holdUuid == null) {
+ holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ }
+ } while (hasMoreData.get());
+ return flowFileSet;
+ }
- // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
- // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
- attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
- }
+ protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
+ String foundSubject, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+ return flowFile;
+ }
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
- flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
- flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
- flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
- flowFileSet.add(flowFile);
+ private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
+ // put arbitrary headers on flow file
+ for (Enumeration<String> headerEnum = request.getHeaderNames();
+ headerEnum.hasMoreElements();) {
+ String headerName = headerEnum.nextElement();
+ if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+ String headerValue = request.getHeader(headerName);
+ attributes.put(headerName, headerValue);
+ }
+ }
+ }
- if (holdUuid == null) {
- holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
- }
- } while (hasMoreData.get());
+ protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
+ final ProcessSession session, String foundSubject, final boolean createHold,
+ final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+ if (createHold) {
+ String uuid = UUID.randomUUID().toString();
+
+ if (flowFileMap.containsKey(uuid)) {
+ uuid = UUID.randomUUID().toString();
+ }
+
+ final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
+ FlowFileEntryTimeWrapper previousWrapper;
+ do {
+ previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+ if (previousWrapper != null) {
+ uuid = UUID.randomUUID().toString();
+ }
+ } while (previousWrapper != null);
+
+ response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+ final String ackUri = "/" + basePath + "/holds/" + uuid;
+ response.addHeader(LOCATION_HEADER_NAME, ackUri);
+ response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+ response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+ new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+ }
+ } else {
+ response.setStatus(this.returnCode);
+ logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
+ new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+
+ session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+ session.commit();
+ }
+ }
- if (createHold) {
- String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid;
+ private void putAttribute(final Map<String, String> map, final String key, final Object value) {
+ if (value == null) {
+ return;
+ }
- if (flowFileMap.containsKey(uuid)) {
- uuid = UUID.randomUUID().toString();
- }
+ putAttribute(map, key, value.toString());
+ }
- final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
- FlowFileEntryTimeWrapper previousWrapper;
- do {
- previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
- if (previousWrapper != null) {
- uuid = UUID.randomUUID().toString();
- }
- } while (previousWrapper != null);
-
- response.setStatus(HttpServletResponse.SC_SEE_OTHER);
- final String ackUri = "/" + basePath + "/holds/" + uuid;
- response.addHeader(LOCATION_HEADER_NAME, ackUri);
- response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
- response.getOutputStream().write(ackUri.getBytes("UTF-8"));
- if (logger.isDebugEnabled()) {
- logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
- new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
- }
- } else {
- response.setStatus(this.returnCode);
- logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}",
- new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile});
+ private void putAttribute(final Map<String, String> map, final String key, final String value) {
+ if (value == null) {
+ return;
+ }
- session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
- session.commit();
- }
- } catch (final Throwable t) {
- session.rollback();
- if (flowFile == null) {
- logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}",
- new Object[]{request.getRemoteHost(), foundSubject, t});
- } else {
- logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}",
- new Object[]{flowFile, request.getRemoteHost(), foundSubject, t});
- }
- response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
- }
- }
+ map.put(key, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 117a068..f8e9015 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -27,15 +27,31 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
@@ -66,7 +82,7 @@ public class TestListenHTTP {
public void setup() throws IOException {
proc = new ListenHTTP();
runner = TestRunners.newTestRunner(proc);
- availablePort = NetworkUtils.availablePort();;
+ availablePort = NetworkUtils.availablePort();
runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
@@ -181,8 +197,8 @@ public class TestListenHTTP {
private int executePOST(String message) throws Exception {
final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
final boolean secure = (sslContextService != null);
- final String scheme = secure ? "https" : "http";
- final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
+ String endpointUrl = buildUrl(secure);
+ final URL url = new URL(endpointUrl);
HttpURLConnection connection;
if (secure) {
@@ -207,6 +223,10 @@ public class TestListenHTTP {
return connection.getResponseCode();
}
+ private String buildUrl(final boolean secure) {
+ return String.format("%s://localhost:%s/%s", secure ? "https" : "http" , availablePort, HTTP_BASE_PATH);
+ }
+
private void testPOSTRequestsReceived(int returnCode) throws Exception {
final List<String> messages = new ArrayList<>();
messages.add("payload 1");
@@ -225,13 +245,29 @@ public class TestListenHTTP {
mockFlowFiles.get(3).assertContentEquals("payload 2");
}
+ private void startWebServerAndSendRequests(Runnable sendRequestToWebserver, int numberOfExpectedFlowFiles, int returnCode) throws Exception {
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.createHttpServer(context);
+
+ new Thread(sendRequestToWebserver).start();
+
+ long responseTimeout = 10000;
+
+ int numTransferred = 0;
+ long startTime = System.currentTimeMillis();
+ while (numTransferred < numberOfExpectedFlowFiles && (System.currentTimeMillis() - startTime < responseTimeout)) {
+ proc.onTrigger(context, processSessionFactory);
+ numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
+ Thread.sleep(100);
+ }
+
+ runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, numberOfExpectedFlowFiles);
+ }
+
private void startWebServerAndSendMessages(final List<String> messages, int returnCode)
throws Exception {
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- proc.createHttpServer(context);
-
Runnable sendMessagestoWebServer = () -> {
try {
for (final String message : messages) {
@@ -244,20 +280,8 @@ public class TestListenHTTP {
fail("Not expecting error here.");
}
};
- new Thread(sendMessagestoWebServer).start();
-
- long responseTimeout = 10000;
-
- int numTransferred = 0;
- long startTime = System.currentTimeMillis();
- while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
- proc.onTrigger(context, processSessionFactory);
- numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
- Thread.sleep(100);
- }
-
- runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size());
+ startWebServerAndSendRequests(sendMessagestoWebServer, messages.size(), returnCode);
}
private SSLContextService configureProcessorSslContextService() throws InitializationException {
@@ -287,4 +311,114 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, SSL_CONTEXT_SERVICE_IDENTIFIER);
return sslContextService;
}
+
+
+ @Test(/*timeout=10000*/)
+ public void testMultipartFormDataRequest() throws Exception {
+
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
+
+ final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
+ final boolean isSecure = (sslContextService != null);
+
+ Runnable sendRequestToWebserver = () -> {
+ try {
+ MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ .build();
+
+ Request request =
+ new Request.Builder()
+ .url(buildUrl(isSecure))
+ .post(multipartBody)
+ .build();
+
+ int timeout = 3000;
+ OkHttpClient client = new OkHttpClient.Builder()
+ .readTimeout(timeout, TimeUnit.MILLISECONDS)
+ .writeTimeout(timeout, TimeUnit.MILLISECONDS)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
+ }
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ Assert.fail(t.toString());
+ }
+ };
+
+
+ startWebServerAndSendRequests(sendRequestToWebserver, 5, 200);
+
+ runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
+ // Part fragments are not processed in the order we submitted them.
+ // We cannot rely on the order we sent them in.
+ MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+ mff.assertAttributeEquals("http.multipart.name", "p1");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+ mff.assertAttributeEquals("http.multipart.name", "p2");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+ mff.assertAttributeEquals("http.multipart.name", "file1");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+ mff.assertAttributeEquals("http.multipart.name", "file2");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+ mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+ mff.assertAttributeEquals("http.multipart.name", "file3");
+ mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
+ mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
+ mff.assertAttributeExists("http.multipart.size");
+ mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+ mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+ mff.assertAttributeExists("http.headers.multipart.content-disposition");
+ }
+
+ private byte[] generateRandomBinaryData(int i) {
+ byte[] bytes = new byte[100];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+ private File createTextFile(String fileName, String... lines) throws IOException {
+ File file = new File(fileName);
+ file.deleteOnExit();
+ for (String string : lines) {
+ Files.append(string, file, Charsets.UTF_8);
+ }
+ return file;
+ }
+ protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
+ Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+ Assert.assertTrue(optional.isPresent());
+ return optional.get();
+ }
}
[2/2] nifi git commit: NIFI-1490: better field naming / displayname
and description mix up fix
Posted by ma...@apache.org.
NIFI-1490: better field naming / displayname and description mix up fix
This closes #2994.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa42635
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa42635
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa42635
Branch: refs/heads/master
Commit: 5aa426358802bfac91657fdb8a8a83094239ced8
Parents: c81a135
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
Authored: Mon Oct 8 13:10:37 2018 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 15:41:38 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenHTTP.java | 30 +-
.../standard/servlets/ListenHTTPServlet.java | 420 +++++++++----------
2 files changed, 223 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 39c91db..5ea9f3a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,21 +134,21 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.defaultValue(String.valueOf(HttpServletResponse.SC_OK))
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
- public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
- .name("max-request-size")
- .displayName("Max Request Size")
+ public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("multipart-request-max-size")
+ .displayName("Multipart Request Max Size")
.description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, "
+ "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
- public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder()
- .name("in-memory-file-size-threshold")
- .displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+ public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("multipart-read-buffer-size")
+ .displayName("Multipart Read Buffer Size")
+ .description("The threshold size, at which the contents of an incoming file would be written to disk. "
+ "Only applies for requests with Content-Type: multipart/form-data. "
+ "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")
- .description("The threshold value for writing a file to disk.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("512 KB")
@@ -164,8 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
- public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize";
- public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold";
+ public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = "multipartRequestMaxSize";
+ public static final String CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE = "multipartReadBufferSize";
private volatile Server server = null;
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -187,8 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(MAX_UNCONFIRMED_TIME);
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
descriptors.add(RETURN_CODE);
- descriptors.add(MAX_REQUEST_SIZE);
- descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
+ descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
+ descriptors.add(MULTIPART_READ_BUFFER_SIZE);
this.properties = Collections.unmodifiableList(descriptors);
}
@@ -237,8 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
- long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
- int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
+ long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
+ int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
throttlerRef.set(streamThrottler);
final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null;
@@ -321,8 +321,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
- contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize);
- contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, requestMaxSize);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE, readBufferSize);
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 8803f0b..07ccd69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -16,6 +16,37 @@
*/
package org.apache.nifi.processors.standard.servlets;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.standard.ListenHTTP;
+import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.FlowFileUnpackager;
+import org.apache.nifi.util.FlowFileUnpackagerV1;
+import org.apache.nifi.util.FlowFileUnpackagerV2;
+import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import javax.servlet.MultipartConfigElement;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -37,40 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
-import javax.servlet.MultipartConfigElement;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.Part;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.ListenHTTP;
-import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
-import org.apache.nifi.stream.io.StreamThrottler;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.FlowFileUnpackager;
-import org.apache.nifi.util.FlowFileUnpackagerV1;
-import org.apache.nifi.util.FlowFileUnpackagerV2;
-import org.apache.nifi.util.FlowFileUnpackagerV3;
-import org.eclipse.jetty.server.Request;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
-
@Path("")
public class ListenHTTPServlet extends HttpServlet {
@@ -105,8 +102,8 @@ public class ListenHTTPServlet extends HttpServlet {
private StreamThrottler streamThrottler;
private String basePath;
private int returnCode;
- private long maxRequestSize;
- private int inMemoryFileSizeThreshold;
+ private long multipartRequestMaxSize;
+ private int multipartReadBufferSize;
@SuppressWarnings("unchecked")
@Override
@@ -121,8 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
- this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
- this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
+ this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
+ this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
}
@Override
@@ -204,11 +201,11 @@ public class ListenHTTPServlet extends HttpServlet {
logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
}
- Set<FlowFile> flowFileSet = null;
+ Set<FlowFile> flowFileSet;
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
- flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+ flowFileSet = handleMultipartRequest(request, session, foundSubject);
} else {
- flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+ flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
}
proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
} catch (final Throwable t) {
@@ -217,199 +214,198 @@ public class ListenHTTPServlet extends HttpServlet {
}
private void handleException(final HttpServletRequest request, final HttpServletResponse response,
- final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
- session.rollback();
- logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
- response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+ final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+ session.rollback();
+ logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
}
- private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject,
- boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException {
- Set<FlowFile> flowFileSet = new HashSet<>();
- String tempDir = System.getProperty("java.io.tmpdir");
- request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold));
- List<Part> requestParts = ImmutableList.copyOf(request.getParts());
- for (int i = 0; i < requestParts.size(); i++) {
- Part part = requestParts.get(i);
- FlowFile flowFile = session.create();
- try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
- StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+ private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject) throws IOException, IllegalStateException, ServletException {
+ Set<FlowFile> flowFileSet = new HashSet<>();
+ String tempDir = System.getProperty("java.io.tmpdir");
+ request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
+ List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+ for (int i = 0; i < requestParts.size(); i++) {
+ Part part = requestParts.get(i);
+ FlowFile flowFile = session.create();
+ try (OutputStream flowFileOutputStream = session.write(flowFile)) {
+ StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
+ }
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+ flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
+ flowFileSet.add(flowFile);
}
- flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
- flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
- flowFileSet.add(flowFile);
- }
- return flowFileSet;
+ return flowFileSet;
}
- private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
- Map<String, String> attributes = new HashMap<>();
- for (String headerName : part.getHeaderNames()) {
- final String headerValue = part.getHeader(headerName);
- putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
- }
- putAttribute(attributes, "http.multipart.size", part.getSize());
- putAttribute(attributes, "http.multipart.content.type", part.getContentType());
- putAttribute(attributes, "http.multipart.name", part.getName());
- putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
- putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1);
- putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
- return session.putAllAttributes(flowFile, attributes);
+ private FlowFile savePartDetailsAsAttributes(final ProcessSession session, final Part part, final FlowFile flowFile, final int sequenceNumber, final int allPartsCount) {
+ final Map<String, String> attributes = new HashMap<>();
+ for (String headerName : part.getHeaderNames()) {
+ final String headerValue = part.getHeader(headerName);
+ putAttribute(attributes, "http.headers.multipart." + headerName, headerValue);
+ }
+ putAttribute(attributes, "http.multipart.size", part.getSize());
+ putAttribute(attributes, "http.multipart.content.type", part.getContentType());
+ putAttribute(attributes, "http.multipart.name", part.getName());
+ putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName());
+ putAttribute(attributes, "http.multipart.fragments.sequence.number", sequenceNumber + 1);
+ putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount);
+ return session.putAllAttributes(flowFile, attributes);
}
private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
- String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
- FlowFile flowFile = null;
- String holdUuid = null;
- final AtomicBoolean hasMoreData = new AtomicBoolean(false);
- final FlowFileUnpackager unpackager;
- if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV3();
- } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV2();
- } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
- unpackager = new FlowFileUnpackagerV1();
- } else {
- unpackager = null;
- }
-
- final Set<FlowFile> flowFileSet = new HashSet<>();
-
- do {
- final long startNanos = System.nanoTime();
- final Map<String, String> attributes = new HashMap<>();
- flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
- if (unpackager == null) {
- IOUtils.copy(in, bos);
- hasMoreData.set(false);
- } else {
- attributes.putAll(unpackager.unpackageFlowFile(in, bos));
-
- if (destinationIsLegacyNiFi) {
- if (attributes.containsKey("nf.file.name")) {
- // for backward compatibility with old nifi...
- attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
- }
-
- if (attributes.containsKey("nf.file.path")) {
- attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
- }
- }
-
- hasMoreData.set(unpackager.hasMoreData());
- }
- }
- }
- });
-
- final long transferNanos = System.nanoTime() - startNanos;
- final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
- // put metadata on flowfile
- final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
- if (StringUtils.isNotBlank(nameVal)) {
- attributes.put(CoreAttributes.FILENAME.key(), nameVal);
- }
-
- String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
- if (sourceSystemFlowFileIdentifier != null) {
- sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
-
- // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
- // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
- attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
- }
-
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
- session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
- flowFileSet.add(flowFile);
-
- if (holdUuid == null) {
- holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
- }
- } while (hasMoreData.get());
- return flowFileSet;
+ String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+ FlowFile flowFile = null;
+ String holdUuid = null;
+ final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+ final FlowFileUnpackager unpackager;
+ if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV3();
+ } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV2();
+ } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV1();
+ } else {
+ unpackager = null;
+ }
+
+ final Set<FlowFile> flowFileSet = new HashSet<>();
+
+ do {
+ final long startNanos = System.nanoTime();
+ final Map<String, String> attributes = new HashMap<>();
+ flowFile = session.create();
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+ if (unpackager == null) {
+ IOUtils.copy(in, bos);
+ hasMoreData.set(false);
+ } else {
+ attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+
+ if (destinationIsLegacyNiFi) {
+ if (attributes.containsKey("nf.file.name")) {
+ // for backward compatibility with old nifi...
+ attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+ }
+
+ if (attributes.containsKey("nf.file.path")) {
+ attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+ }
+ }
+
+ hasMoreData.set(unpackager.hasMoreData());
+ }
+ }
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ // put metadata on flowfile
+ final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+ if (StringUtils.isNotBlank(nameVal)) {
+ attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+ }
+
+ String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+ if (sourceSystemFlowFileIdentifier != null) {
+ sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+
+ // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+ // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+ attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+ }
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+ session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+ flowFileSet.add(flowFile);
+
+ if (holdUuid == null) {
+ holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ }
+ } while (hasMoreData.get());
+ return flowFileSet;
}
protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
- String foundSubject, FlowFile flowFile) {
- Map<String, String> attributes = new HashMap<>();
- addMatchingRequestHeaders(request, attributes);
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
- flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
- flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
- return flowFile;
+ String foundSubject, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+ return flowFile;
}
private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
- // put arbitrary headers on flow file
- for (Enumeration<String> headerEnum = request.getHeaderNames();
- headerEnum.hasMoreElements();) {
- String headerName = headerEnum.nextElement();
- if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
- String headerValue = request.getHeader(headerName);
- attributes.put(headerName, headerValue);
- }
- }
+ // put arbitrary headers on flow file
+ for (Enumeration<String> headerEnum = request.getHeaderNames();
+ headerEnum.hasMoreElements(); ) {
+ String headerName = headerEnum.nextElement();
+ if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+ String headerValue = request.getHeader(headerName);
+ attributes.put(headerName, headerValue);
+ }
+ }
}
protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
- final ProcessSession session, String foundSubject, final boolean createHold,
- final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
- if (createHold) {
- String uuid = UUID.randomUUID().toString();
-
- if (flowFileMap.containsKey(uuid)) {
- uuid = UUID.randomUUID().toString();
- }
-
- final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
- FlowFileEntryTimeWrapper previousWrapper;
- do {
- previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
- if (previousWrapper != null) {
- uuid = UUID.randomUUID().toString();
- }
- } while (previousWrapper != null);
-
- response.setStatus(HttpServletResponse.SC_SEE_OTHER);
- final String ackUri = "/" + basePath + "/holds/" + uuid;
- response.addHeader(LOCATION_HEADER_NAME, ackUri);
- response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
- response.getOutputStream().write(ackUri.getBytes("UTF-8"));
- if (logger.isDebugEnabled()) {
- logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
- new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
- }
- } else {
- response.setStatus(this.returnCode);
- logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
- new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
-
- session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
- session.commit();
- }
+ final ProcessSession session, String foundSubject, final boolean createHold,
+ final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+ if (createHold) {
+ String uuid = UUID.randomUUID().toString();
+
+ if (flowFileMap.containsKey(uuid)) {
+ uuid = UUID.randomUUID().toString();
+ }
+
+ final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
+ FlowFileEntryTimeWrapper previousWrapper;
+ do {
+ previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+ if (previousWrapper != null) {
+ uuid = UUID.randomUUID().toString();
+ }
+ } while (previousWrapper != null);
+
+ response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+ final String ackUri = "/" + basePath + "/holds/" + uuid;
+ response.addHeader(LOCATION_HEADER_NAME, ackUri);
+ response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+ response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+ new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+ }
+ } else {
+ response.setStatus(this.returnCode);
+ logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
+ new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+
+ session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+ session.commit();
+ }
}
private void putAttribute(final Map<String, String> map, final String key, final Object value) {
- if (value == null) {
- return;
- }
+ if (value == null) {
+ return;
+ }
- putAttribute(map, key, value.toString());
- }
+ putAttribute(map, key, value.toString());
+ }
private void putAttribute(final Map<String, String> map, final String key, final String value) {
- if (value == null) {
- return;
- }
+ if (value == null) {
+ return;
+ }
- map.put(key, value);
- }
+ map.put(key, value);
+ }
}