You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/20 15:01:49 UTC

[13/42] incubator-nifi git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index 0000000,31e5105..da80546
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@@ -1,0 -1,293 +1,293 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processors.standard.util.FileInfo;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ /**
+  * Base class for PutFTP & PutSFTP
+  * @param <T>
+  */
+ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor {
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to success").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor").build();
+     public static final Relationship REL_REJECT = new Relationship.Builder().name("reject").description("FlowFiles that were rejected by the destination system").build();
+ 
+     private final Set<Relationship> relationships;
+ 
+     public PutFileTransfer() {
+         super();
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         relationships.add(REL_REJECT);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     protected abstract T getFileTransfer(final ProcessContext context);
+ 
+     protected void beforePut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
+ 
+     }
+ 
+     protected void afterPut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
+ 
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ 
+         final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger();
+         int fileCount = 0;
+         try (final T transfer = getFileTransfer(context)) {
+             do {
+                 final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+                 final String workingDirPath;
+                 if (rootPath == null) {
+                     workingDirPath = null;
+                 } else {
+                     File workingDirectory = new File(rootPath);
+                     if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
+                         workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
+                     }
+                     workingDirPath = workingDirectory.getPath().replace("\\", "/");
+                 }
+ 
+                 final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
+                 final ConflictResult conflictResult = identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(),
+                         transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger);
+ 
+                 if (conflictResult.isTransfer()) {
+                     final StopWatch stopWatch = new StopWatch();
+                     stopWatch.start();
+ 
+                     beforePut(flowFile, context, transfer);
+                     final FlowFile flowFileToTransfer = flowFile;
+                     final ObjectHolder<String> fullPathRef = new ObjectHolder<>(null);
+                     session.read(flowFile, new InputStreamCallback() {
+                         @Override
+                         public void process(final InputStream in) throws IOException {
+                             try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+                                 if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) {
+                                     transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
+                                 }
+ 
+                                 fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn));
+                             }
+                         }
+                     });
+                     afterPut(flowFile, context, transfer);
+ 
+                     stopWatch.stop();
+                     final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+                     final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                     logger.info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}",
+                             new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
+ 
+                     String fullPathWithSlash = fullPathRef.get();
+                     if (!fullPathWithSlash.startsWith("/")) {
+                         fullPathWithSlash = "/" + fullPathWithSlash;
+                     }
+                     final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash;
+                     session.getProvenanceReporter().send(flowFile, destinationUri, millis);
+                 }
+ 
+                 if (conflictResult.isPenalize()) {
+                     flowFile = session.penalize(flowFile);
+                 }
+ 
+                 session.transfer(flowFile, conflictResult.getRelationship());
+                 session.commit();
 -            } while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
++            } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
+         } catch (final IOException e) {
+             context.yield();
+             logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
+             flowFile = session.penalize(flowFile);
+             session.transfer(flowFile, REL_FAILURE);
+         } catch (final FlowFileAccessException e) {
+             context.yield();
+             logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()});
+             flowFile = session.penalize(flowFile);
+             session.transfer(flowFile, REL_FAILURE);
+         } catch (final ProcessException e) {
+             context.yield();
+             logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()});
+             flowFile = session.penalize(flowFile);
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ 
+     //Attempts to identify naming or content issues with files before they are transferred.
+     private ConflictResult identifyAndResolveConflictFile(final String conflictResolutionType, final T transfer, final String path, final FlowFile flowFile, final boolean rejectZeroByteFiles, final ProcessorLog logger) throws IOException {
+         Relationship destinationRelationship = REL_SUCCESS;
+         String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+         boolean transferFile = true;
+         boolean penalizeFile = false;
+ 
+         //First, check if the file is empty
+         //Reject files that are zero bytes or less
+         if (rejectZeroByteFiles) {
+             final long sizeInBytes = flowFile.getSize();
+             if (sizeInBytes == 0) {
+                 logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
+                 return new ConflictResult(REL_REJECT, false, fileName, true);
+             }
+         }
+ 
+         //Second, check if the user doesn't care about detecting naming conflicts ahead of time
+         if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) {
+             return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+         }
+ 
+         final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName);
+         if (remoteFileInfo == null) {
+             return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+         }
+ 
+         if (remoteFileInfo.isDirectory()) {
+             logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+             return new ConflictResult(REL_REJECT, false, fileName, false);
+         }
+ 
+         logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
+                 new Object[]{flowFile, conflictResolutionType});
+ 
+         switch (conflictResolutionType.toUpperCase()) {
+             case FileTransfer.CONFLICT_RESOLUTION_REJECT:
+                 destinationRelationship = REL_REJECT;
+                 transferFile = false;
+                 penalizeFile = false;
+                 logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+                 break;
+             case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
+                 transfer.deleteFile(path, fileName);
+                 destinationRelationship = REL_SUCCESS;
+                 transferFile = true;
+                 penalizeFile = false;
+                 logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
+                 break;
+             case FileTransfer.CONFLICT_RESOLUTION_RENAME:
+                 boolean uniqueNameGenerated = false;
+                 for (int i = 1; i < 100 && !uniqueNameGenerated; i++) {
+                     String possibleFileName = i + "." + fileName;
+ 
+                     final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName);
+                     uniqueNameGenerated = (renamedFileInfo == null);
+                     if (uniqueNameGenerated) {
+                         fileName = possibleFileName;
+                         logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
+                         destinationRelationship = REL_SUCCESS;
+                         transferFile = true;
+                         penalizeFile = false;
+                         break;
+                     }
+                 }
+                 if (!uniqueNameGenerated) {
+                     destinationRelationship = REL_REJECT;
+                     transferFile = false;
+                     penalizeFile = false;
+                     logger.info("Could not determine a unique name after 99 attempts for.  Switching resolution mode to REJECT for " + flowFile);
+                 }
+                 break;
+             case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
+                 destinationRelationship = REL_SUCCESS;
+                 transferFile = false;
+                 penalizeFile = false;
+                 logger.info("Resolving conflict for {}  by not transferring file and and still considering the process a success.", new Object[]{flowFile});
+                 break;
+             case FileTransfer.CONFLICT_RESOLUTION_FAIL:
+                 destinationRelationship = REL_FAILURE;
+                 transferFile = false;
+                 penalizeFile = true;
+                 logger.info("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
+             default:
+                 break;
+         }
+ 
+         return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+     }
+ 
+     /**
+      * static inner class to hold conflict data
+      */
+     private static class ConflictResult {
+ 
+         final Relationship relationship;
+         final boolean transferFile;
+         final String newFileName;
+         final boolean penalizeFile;
+ 
+         public ConflictResult(final Relationship relationship, final boolean transferFileVal, final String newFileNameVal, final boolean penalizeFileVal) {
+             this.relationship = relationship;
+             this.transferFile = transferFileVal;
+             this.newFileName = newFileNameVal;
+             this.penalizeFile = penalizeFileVal;
+         }
+ 
+         public boolean isTransfer() {
+             return transferFile;
+         }
+ 
+         public boolean isPenalize() {
+             return penalizeFile;
+         }
+ 
+         public String getFileName() {
+             return newFileName;
+         }
+ 
+         public Relationship getRelationship() {
+             return relationship;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 0000000,cae61f0..1cf5f1f
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@@ -1,0 -1,320 +1,325 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard.servlets;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.security.cert.X509Certificate;
+ import java.util.Enumeration;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ import java.util.zip.GZIPInputStream;
+ 
+ import javax.servlet.ServletConfig;
+ import javax.servlet.ServletContext;
+ import javax.servlet.ServletException;
+ import javax.servlet.http.HttpServlet;
+ import javax.servlet.http.HttpServletRequest;
+ import javax.servlet.http.HttpServletResponse;
+ import javax.ws.rs.Path;
+ import javax.ws.rs.core.MediaType;
+ 
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.StreamThrottler;
+ import org.apache.nifi.logging.ProcessorLog;
++import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processors.standard.ListenHTTP;
+ import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+ import org.apache.nifi.util.FlowFileUnpackager;
+ import org.apache.nifi.util.FlowFileUnpackagerV1;
+ import org.apache.nifi.util.FlowFileUnpackagerV2;
+ import org.apache.nifi.util.FlowFileUnpackagerV3;
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @Path(ListenHTTP.URI)
+ public class ListenHTTPServlet extends HttpServlet {
+ 
+     private static final long serialVersionUID = 5329940480987723163L;
+ 
+     public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri";
+     public static final String LOCATION_HEADER_NAME = "Location";
+     public static final String DEFAULT_FOUND_SUBJECT = "none";
+     public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile";
+     public static final String APPLICATION_FLOW_FILE_V2 = "application/flowfile-v2";
+     public static final String APPLICATION_FLOW_FILE_V3 = "application/flowfile-v3";
+     public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
+     public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
+     public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
+     public static final String ACCEPT_HEADER_NAME = "Accept";
+     public static final String ACCEPT_HEADER_VALUE = APPLICATION_FLOW_FILE_V3 + "," + APPLICATION_FLOW_FILE_V2 + "," + APPLICATION_FLOW_FILE_V1 + ",*/*;q=0.8";
+     public static final String ACCEPT_ENCODING_NAME = "Accept-Encoding";
+     public static final String ACCEPT_ENCODING_VALUE = "gzip";
+     public static final String GZIPPED_HEADER = "flowfile-gzipped";
+     public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
+     public static final String PROTOCOL_VERSION = "3";
+ 
+     private final AtomicLong filesReceived = new AtomicLong(0L);
+     private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
+ 
+     private ProcessorLog logger;
+     private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
++    private volatile ProcessContext processContext;
+     private Pattern authorizedPattern;
+     private Pattern headerPattern;
+     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
+     private StreamThrottler streamThrottler;
+ 
+     /**
+      *
+      * @param config
+      * @throws ServletException
+      */
+     @SuppressWarnings("unchecked")
+     @Override
+     public void init(final ServletConfig config) throws ServletException {
+         final ServletContext context = config.getServletContext();
+         this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
+         this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
++        this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
+         this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
+         this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
+         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
+         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
+     }
+ 
+     @Override
+     protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+         response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE);
+         response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE);
+         response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
+     }
+ 
+     @Override
+     protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
++        final ProcessContext context = processContext;
++        
+         ProcessSessionFactory sessionFactory;
+         do {
+             sessionFactory = sessionFactoryHolder.get();
+             if (sessionFactory == null) {
+                 try {
+                     Thread.sleep(10);
+                 } catch (final InterruptedException e) {
+                 }
+             }
+         } while (sessionFactory == null);
+ 
+         final ProcessSession session = sessionFactory.createSession();
+         FlowFile flowFile = null;
+         String holdUuid = null;
+         String foundSubject = null;
+         try {
+             final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
+             if (n == 0 || !spaceAvailable.get()) {
 -                if (session.getAvailableRelationships().isEmpty()) {
++                if (context.getAvailableRelationships().isEmpty()) {
+                     spaceAvailable.set(false);
+                     if (logger.isDebugEnabled()) {
+                         logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable");
+                     }
+                     response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+                     return;
+                 } else {
+                     spaceAvailable.set(true);
+                 }
+             }
+             response.setHeader("Content-Type", MediaType.TEXT_PLAIN);
+ 
+             final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+ 
+             final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+             foundSubject = DEFAULT_FOUND_SUBJECT;
+             if (certs != null && certs.length > 0) {
+                 for (final X509Certificate cert : certs) {
+                     foundSubject = cert.getSubjectDN().getName();
+                     if (authorizedPattern.matcher(foundSubject).matches()) {
+                         break;
+                     } else {
+                         logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost());
+                         response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn");
+                         return;
+                     }
+                 }
+             }
+ 
+             final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER);
+             Integer protocolVersion = null;
+             if (destinationVersion != null) {
+                 try {
+                     protocolVersion = Integer.valueOf(destinationVersion);
+                 } catch (final NumberFormatException e) {
+                     // Value was invalid. Treat as if the header were missing.
+                 }
+             }
+ 
+             final boolean destinationIsLegacyNiFi = (protocolVersion == null);
+             final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER));
+             final String contentType = request.getContentType();
+ 
+             final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream();
+ 
+             final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled);
+ 
+             if (logger.isDebugEnabled()) {
+                 logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
+             }
+ 
+             final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+             final FlowFileUnpackager unpackager;
+             if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+                 unpackager = new FlowFileUnpackagerV3();
+             } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+                 unpackager = new FlowFileUnpackagerV2();
+             } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+                 unpackager = new FlowFileUnpackagerV1();
+             } else {
+                 unpackager = null;
+             }
+ 
+             final Set<FlowFile> flowFileSet = new HashSet<>();
+ 
+             do {
+                 final long startNanos = System.nanoTime();
+                 final Map<String, String> attributes = new HashMap<>();
+                 flowFile = session.create();
+                 flowFile = session.write(flowFile, new OutputStreamCallback() {
+                     @Override
+                     public void process(final OutputStream rawOut) throws IOException {
+                         try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+                             if (unpackager == null) {
+                                 IOUtils.copy(in, bos);
+                                 hasMoreData.set(false);
+                             } else {
+                                 attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+ 
+                                 if (destinationIsLegacyNiFi) {
+                                     if (attributes.containsKey("nf.file.name")) {
+                                         // for backward compatibility with old nifi...
+                                         attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+                                     }
+ 
+                                     if (attributes.containsKey("nf.file.path")) {
+                                         attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+                                     }
+                                 }
+ 
+                                 // remove deprecated FlowFile attribute that was used in older versions of NiFi
+                                 attributes.remove("parent.uuid");
+ 
+                                 hasMoreData.set(unpackager.hasMoreData());
+                             }
+                         }
+                     }
+                 });
+ 
+                 final long transferNanos = System.nanoTime() - startNanos;
+                 final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+ 
+                 // put metadata on flowfile
+                 final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+                 if (StringUtils.isNotBlank(nameVal)) {
+                     attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+                 }
+                 
+                 // put arbitrary headers on flow file
+                 for(Enumeration<String> headerEnum = request.getHeaderNames(); 
+                 		headerEnum.hasMoreElements(); ) {
+                 	String headerName = headerEnum.nextElement();
+                 	if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+ 	                	String headerValue = request.getHeader(headerName);
+ 	                	attributes.put(headerName, headerValue);
+ 	                }
+                 }
+ 
+                 String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+                 if (sourceSystemFlowFileIdentifier != null) {
+                     sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+ 
+                     // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+                     // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+                     attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+                 }
+ 
+                 flowFile = session.putAllAttributes(flowFile, attributes);
+                 session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+                 flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+                 flowFileSet.add(flowFile);
+ 
+                 if (holdUuid == null) {
+                     holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+                 }
+             } while (hasMoreData.get());
+ 
+             if (createHold) {
+                 String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid;
+ 
+                 if (flowFileMap.containsKey(uuid)) {
+                     uuid = UUID.randomUUID().toString();
+                 }
+ 
+                 final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis());
+                 FlowFileEntryTimeWrapper previousWrapper;
+                 do {
+                     previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+                     if (previousWrapper != null) {
+                         uuid = UUID.randomUUID().toString();
+                     }
+                 } while (previousWrapper != null);
+ 
+                 response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+                 final String ackUri = ListenHTTP.URI + "/holds/" + uuid;
+                 response.addHeader(LOCATION_HEADER_NAME, ackUri);
+                 response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+                 response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+                 if (logger.isDebugEnabled()) {
+                     logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+                             new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+                 }
+             } else {
+                 response.setStatus(HttpServletResponse.SC_OK);
+                 logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}",
+                         new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile});
+ 
+                 session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+                 session.commit();
+             }
+         } catch (final Throwable t) {
+             session.rollback();
+             if (flowFile == null) {
+                 logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+             } else {
+                 logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request.getRemoteHost(), foundSubject, t});
+             }
+             response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 0000000,a6402e4..ab4c978
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@@ -1,0 -1,138 +1,139 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.processors.standard.DistributeLoad;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
+ 
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ public class TestDistributeLoad {
+ 
+     @BeforeClass
+     public static void before() {
+         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+         System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+         System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug");
+     }
+ 
+     @Test
+     public void testDefaultRoundRobin() {
+         final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+         testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+ 
+         for (int i = 0; i < 101; i++) {
+             testRunner.enqueue(new byte[0]);
+         }
+ 
+         testRunner.run(101);
+         testRunner.assertTransferCount("1", 2);
+         for (int i = 2; i <= 100; i++) {
+             testRunner.assertTransferCount(String.valueOf(i), 1);
+         }
+     }
+ 
+     @Test
+     public void testWeightedRoundRobin() {
+         final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+         testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+ 
+         testRunner.setProperty("1", "5");
+         testRunner.setProperty("2", "3");
+ 
+         for (int i = 0; i < 106; i++) {
+             testRunner.enqueue(new byte[0]);
+         }
+ 
+         testRunner.run(108);
+         testRunner.assertTransferCount("1", 5);
+         testRunner.assertTransferCount("2", 3);
+         for (int i = 3; i <= 100; i++) {
+             testRunner.assertTransferCount(String.valueOf(i), 1);
+         }
+     }
+ 
+     @Test
+     public void testValidationOnAddedProperties() {
+         final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+         testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+ 
+         testRunner.setProperty("1", "5");
+ 
+         try {
+             testRunner.setProperty("1", "0");
+             Assert.fail("Allows property '1' to be set to '0'");
+         } catch (final AssertionError e) {
+             // expected behavior
+         }
+ 
+         try {
+             testRunner.setProperty("1", "-1");
+             Assert.fail("Allows property '1' to be set to '-1'");
+         } catch (final AssertionError e) {
+             // expected behavior
+         }
+ 
+         testRunner.setProperty("1", "101");
+         testRunner.setProperty("100", "5");
+ 
+         try {
+             testRunner.setProperty("101", "5");
+             Assert.fail("Allows property '101' to be set to '5'");
+         } catch (final AssertionError e) {
+             // expected behavior
+         }
+ 
+         try {
+             testRunner.setProperty("0", "5");
+             Assert.fail("Allows property '0' to be set to '5'");
+         } catch (final AssertionError e) {
+             // expected behavior
+         }
+ 
+         try {
+             testRunner.setProperty("-1", "5");
+             Assert.fail("Allows property '-1' to be set to '5'");
+         } catch (final AssertionError e) {
+             // expected behavior
+         }
+     }
+ 
+     @Test
+     public void testNextAvailable() {
+         final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+ 
+         testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
+         testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+ 
+         for (int i = 0; i < 99; i++) {
+             testRunner.enqueue(new byte[0]);
+         }
+ 
+         testRunner.setRelationshipUnavailable("50");
+ 
+         testRunner.run(101);
+         testRunner.assertQueueEmpty();
+ 
+         for (int i = 1; i <= 100; i++) {
++            System.out.println(i);
+             testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index 0000000,9e04439..7fa183f
mode 000000,100644..100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@@ -1,0 -1,124 +1,132 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processor;
+ 
+ import java.util.Map;
++import java.util.Set;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ 
+ /**
+  * <p>
+  * Provides a bridge between a Processor and the NiFi Framework
+  * </p>
+  *
+  * <p>
+  * <b>Note: </b>Implementations of this interface are NOT necessarily
+  * thread-safe.
+  * </p>
+  */
+ public interface ProcessContext {
+ 
+     /**
+      * Retrieves the current value set for the given descriptor, if a value is
+      * set - else uses the descriptor to determine the appropriate default value
+      *
+      * @param descriptor
+      * @return
+      */
+     PropertyValue getProperty(PropertyDescriptor descriptor);
+ 
+     /**
+      * Retrieves the current value set for the given descriptor, if a value is
+      * set - else uses the descriptor to determine the appropriate default value
+      *
+      * @param propertyName
+      * @return
+      */
+     PropertyValue getProperty(String propertyName);
+ 
+     /**
+      * Creates and returns a {@link PropertyValue} object that can be used for
+      * evaluating the value of the given String
+      *
+      * @param rawValue
+      * @return
+      */
+     PropertyValue newPropertyValue(String rawValue);
+ 
+     /**
+      * <p>
+      * Causes the Processor not to be scheduled for some pre-configured amount
+      * of time. The duration of time for which the processor will not be
+      * scheduled is configured in the same manner as the processor's scheduling
+      * period.
+      * </p>
+      *
+      * <p>
+      * <b>Note: </b>This is NOT a blocking call and does not suspend execution
+      * of the current thread.
+      * </p>
+      */
+     void yield();
+ 
+     /**
+      * @return the maximum number of threads that may be executing this
+      * processor's code at any given time
+      */
+     int getMaxConcurrentTasks();
+ 
+     /**
+      * @return the annotation data configured for this processor
+      */
+     String getAnnotationData();
+ 
+     /**
+      * Returns a Map of all PropertyDescriptors to their configured values. This
+      * Map may or may not be modifiable, but modifying its values will not
+      * change the values of the processor's properties
+      *
+      * @return
+      */
+     Map<PropertyDescriptor, String> getProperties();
+ 
+     /**
+      * Encrypts the given value using the password provided in the NiFi
+      * Properties
+      *
+      * @param unencrypted
+      * @return
+      */
+     String encrypt(String unencrypted);
+ 
+     /**
+      * Decrypts the given value using the password provided in the NiFi
+      * Properties
+      *
+      * @param encrypted
+      * @return
+      */
+     String decrypt(String encrypted);
+ 
+     /**
+      * Provides a {@code ControllerServiceLookup} that can be used to obtain a
+      * Controller Service
+      *
+      * @return
+      */
+     ControllerServiceLookup getControllerServiceLookup();
++    
++    /**
++     * @return the set of all relationships for which space is available to
++     * receive new objects
++     */
++    Set<Relationship> getAvailableRelationships();
++
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index 0000000,09d1bd2..d3de916
mode 000000,100644..100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@@ -1,0 -1,719 +1,713 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processor;
+ 
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Path;
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.FlowFileHandlingException;
+ import org.apache.nifi.processor.exception.MissingFlowFileException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ 
+ /**
+  * <p>
+  * A process session encompasses all the behaviors a processor can perform to
+  * obtain, clone, read, modify remove FlowFiles in an atomic unit. A process
+  * session is always tied to a single processor at any one time and ensures no
+  * FlowFile can ever be accessed by any more than one processor at a given time.
+  * The session also ensures that all FlowFiles are always accounted for. The
+  * creator of a ProcessSession is always required to manage the session.</p>
+  *
+  * <p>
+  * A session is not considered thread safe. The session supports a unit of work
+  * that is either committed or rolled back</p>
+  *
+  * <p>
+  * As noted on specific methods and for specific exceptions automated rollback
+  * will occur to ensure consistency of the repository. However, several
+  * situations can result in exceptions yet not cause automated rollback. In
+  * these cases the consistency of the repository will be retained but callers
+  * will be able to indicate whether it should result in rollback or continue on
+  * toward a commit.</p>
+  *
+  * <p>
+  * A process session instance may be used continuously. That is, after each
+  * commit or rollback, the session can be used again.</p>
+  *
+  * @author unattributed
+  */
+ public interface ProcessSession {
+ 
+     /**
+      * <p>
+      * Commits the current session ensuring all operations against FlowFiles
+      * within this session are atomically persisted. All FlowFiles operated on
+      * within this session must be accounted for by transfer or removal or the
+      * commit will fail.</p>
+      *
+      * <p>
+      * As soon as the commit completes the session is again ready to be used</p>
+      *
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session.
+      * @throws FlowFileHandlingException if not all FlowFiles acted upon within
+      * this session are accounted for by user code such that they have a
+      * transfer identified or where marked for removal. Automated rollback
+      * occurs.
+      * @throws ProcessException if some general fault occurs while persisting
+      * the session. Initiates automatic rollback. The root cause can be obtained
+      * via <code>Exception.getCause()</code>
+      */
+     void commit();
+ 
+     /**
+      * Reverts any changes made during this session. All FlowFiles are restored
+      * back to their initial session state and back to their original queues. If
+      * this session is already committed or rolled back then no changes will
+      * occur. This method can be called any number of times. Calling this method
+      * is identical to calling {@link #rollback(boolean)} passing
+      * <code>false</code> as the parameter.
+      */
+     void rollback();
+ 
+     /**
+      * Reverts any changes made during this session. All FlowFiles are restored
+      * back to their initial session state and back to their original queues,
+      * after optionally being penalized. If this session is already committed or
+      * rolled back then no changes will occur. This method can be called any
+      * number of times.
+      *
+      * @param penalize whether or not the FlowFiles that are being restored back
+      * to their queues should be penalized
+      */
+     void rollback(boolean penalize);
+ 
+     /**
+      * Adjusts counter data for the given counter name and takes care of
+      * registering the counter if not already present. The adjustment occurs
+      * only if and when the ProcessSession is committed.
+      *
+      * @param name the name of the counter
+      * @param delta the delta by which to modify the counter (+ or -)
+      * @param immediate if true, the counter will be updated immediately,
+      * without regard to whether the ProcessSession is commit or rolled back;
+      * otherwise, the counter will be incremented only if and when the
+      * ProcessSession is committed.
+      */
+     void adjustCounter(String name, long delta, boolean immediate);
+ 
+     /**
+      * @return FlowFile that is next highest priority FlowFile to process.
+      * Otherwise returns null.
+      */
+     FlowFile get();
+ 
+     /**
+      * Returns up to <code>maxResults</code> FlowFiles from the work queue. If
+      * no FlowFiles are available, returns an empty list. Will not return null.
+      * If multiple incoming queues are present, the behavior is unspecified in
+      * terms of whether all queues or only a single queue will be polled in a
+      * single call.
+      *
+      * @param maxResults the maximum number of FlowFiles to return
+      * @return
+      * @throws IllegalArgumentException if <code>maxResults</code> is less than
+      * 0
+      */
+     List<FlowFile> get(int maxResults);
+ 
+     /**
+      * <p>
+      * Returns all FlowFiles from all of the incoming queues for which the given
+      * {@link FlowFileFilter} indicates should be accepted. Calls to this method
+      * provide exclusive access to the underlying queues. I.e., no other thread
+      * will be permitted to pull FlowFiles from this Processor's queues or add
+      * FlowFiles to this Processor's incoming queues until this method call has
+      * returned.
+      * </p>
+      *
+      * @param filter
+      * @return
+      */
+     List<FlowFile> get(FlowFileFilter filter);
+ 
+     /**
+      * @return the QueueSize that represents the number of FlowFiles and their
+      * combined data size for all FlowFiles waiting to be processed by the
+      * Processor that owns this ProcessSession, regardless of which Connection
+      * the FlowFiles live on
+      */
+     QueueSize getQueueSize();
+ 
+     /**
 -     * @return the set of all relationships for which space is available to
 -     * receive new objects
 -     */
 -    Set<Relationship> getAvailableRelationships();
 -
 -    /**
+      * Creates a new FlowFile in the repository with no content and without any
+      * linkage to a parent FlowFile. This method is appropriate only when data
+      * is received or created from an external system. Otherwise, this method
+      * should be avoided and should instead use {@link #create(FlowFile)} or
+      * {@link #create(Collection<FlowFile>)}.
+      *
+      * When this method is used, a Provenance CREATE or RECEIVE Event should be
+      * generated. See the {@link #getProvenanceReporter()} method and
+      * {@link ProvenanceReporter} class for more information
+      *
+      * @return newly created FlowFile
+      */
+     FlowFile create();
+ 
+     /**
+      * Creates a new FlowFile in the repository with no content but with a
+      * parent linkage to <code>parent</code>. The newly created FlowFile will
+      * inherit all of the parent's attributes except for the UUID. This method
+      * will automatically generate a Provenance FORK event or a Provenance JOIN
+      * event, depending on whether or not other FlowFiles are generated from the
+      * same parent before the ProcessSession is committed.
+      *
+      * @param parent
+      * @return
+      */
+     FlowFile create(FlowFile parent);
+ 
+     /**
+      * Creates a new FlowFile in the repository with no content but with a
+      * parent linkage to the FlowFiles specified by the parents Collection. The
+      * newly created FlowFile will inherit all of the attributes that are in
+      * common to all parents (except for the UUID, which will be in common if
+      * only a single parent exists). This method will automatically generate a
+      * Provenance JOIN event.
+      *
+      * @param parents
+      * @return
+      */
+     FlowFile create(Collection<FlowFile> parents);
+ 
+     /**
+      * Creates a new FlowFile that is a clone of the given FlowFile as of the
+      * time this is called, both in content and attributes. This method
+      * automatically emits a Provenance CLONE Event.
+      *
+      * @param example FlowFile to be the source of cloning - given FlowFile must
+      * be a part of the given session
+      * @return FlowFile that is a clone of the given example
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      * @throws NullPointerException if the argument null
+      */
+     FlowFile clone(FlowFile example);
+ 
+     /**
+      * Creates a new FlowFile whose parent is the given FlowFile. The content of
+      * the new FlowFile will be a subset of the byte sequence of the given
+      * FlowFile starting at the specified offset and with the length specified.
+      * The new FlowFile will contain all of the attributes of the original. This
+      * method automatically emits a Provenance FORK Event (or a Provenance CLONE
+      * Event, if the offset is 0 and the size is exactly equal to the size of
+      * the example FlowFile).
+      *
+      * @param example
+      * @param offset
+      * @param size
+      * @return a FlowFile with the specified size whose parent is first argument
+      * to this function
+      *
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session, or if the
+      * specified offset + size exceeds that of the size of the example FlowFile.
+      * Automatic rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      */
+     FlowFile clone(FlowFile example, long offset, long size);
+ 
+     /**
+      * Sets a penalty for the given FlowFile which will make it unavailable to
+      * be operated on any further during the penalty period.
+      *
+      * @param flowFile to penalize
+      * @return FlowFile the new FlowFile reference to use
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if the argument null
+      */
+     FlowFile penalize(FlowFile flowFile);
+ 
+     /**
+      * Updates the given FlowFiles attributes with the given key/value pair. If
+      * the key is named {@code uuid}, this attribute will be ignored.
+      *
+      * @param flowFile to update
+      * @param key of attribute
+      * @param value of attribute
+      * @return FlowFile the updated FlowFile
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if an argument is null
+      */
+     FlowFile putAttribute(FlowFile flowFile, String key, String value);
+ 
+     /**
+      * Updates the given FlowFiles attributes with the given key/value pairs. If
+      * the map contains a key named {@code uuid}, this attribute will be
+      * ignored.
+      *
+      * @param flowFile to update
+      * @param attributes the attributes to add to the given FlowFile
+      * @return FlowFile the updated FlowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if an argument is null
+      */
+     FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes);
+ 
+     /**
+      * Removes the given FlowFile attribute with the given key. If the key is
+      * named {@code uuid}, this method will return the same FlowFile without
+      * removing any attribute.
+      *
+      * @param flowFile to update
+      * @param key of attribute
+      * @return FlowFile the updated FlowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if the argument null
+      */
+     FlowFile removeAttribute(FlowFile flowFile, String key);
+ 
+     /**
+      * Removes the attributes with the given keys from the given FlowFile. If
+      * the set of keys contains the value {@code uuid}, this key will be ignored
+      *
+      * @param flowFile to update
+      * @param keys of attribute
+      * @return FlowFile the updated FlowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if the argument null
+      */
+     FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys);
+ 
+     /**
+      * Remove all attributes from the given FlowFile that have keys which match
+      * the given pattern. If the pattern matches the key {@code uuid}, this key
+      * will not be removed.
+      *
+      * @param flowFile to update
+      * @param keyPattern may be null; if supplied is matched against each of the
+      * FlowFile attribute keys
+      * @return FlowFile containing only attributes which did not meet the key
+      * pattern
+      */
+     FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern);
+ 
+     /**
+      * Transfers the given FlowFile to the appropriate destination processor
+      * work queue(s) based on the given relationship. If the relationship leads
+      * to more than one destination the state of the FlowFile is replicated such
+      * that each destination receives an exact copy of the FlowFile though each
+      * will have its own unique identity. The destination processors will not be
+      * able to operate on the given FlowFile until this session is committed or
+      * until the ownership of the session is migrated to another processor. If
+      * ownership of the session is passed to a destination processor then that
+      * destination processor will have immediate visibility of the transferred
+      * FlowFiles within the session.
+      *
+      * @param flowFile
+      * @param relationship
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if the argument null
+      * @throws IllegalArgumentException if given relationship is not a known or
+      * registered relationship
+      */
+     void transfer(FlowFile flowFile, Relationship relationship);
+ 
+     /**
+      * Transfers the given FlowFile back to the work queue from which it was
+      * pulled. The processor will not be able to operate on the given FlowFile
+      * until this session is committed. Any modifications that have been made to
+      * the FlowFile will be maintained. FlowFiles that are created by the
+      * processor cannot be transferred back to themselves via this method.
+      *
+      * @param flowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws IllegalArgumentException if the FlowFile was created by this
+      * processor
+      * @throws NullPointerException if the argument null
+      */
+     void transfer(FlowFile flowFile);
+ 
+     /**
+      * Transfers the given FlowFiles back to the work queues from which the
+      * FlowFiles were pulled. The processor will not be able to operate on the
+      * given FlowFile until this session is committed. Any modifications that
+      * have been made to the FlowFile will be maintained. FlowFiles that are
+      * created by the processor cannot be transferred back to themselves via
+      * this method.
+      *
+      * @param flowFiles
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFiles are already
+      * transferred or removed or don't belong to this session. Automatic
+      * rollback will occur.
+      * @throws IllegalArgumentException if the FlowFile was created by this
+      * processor
+      * @throws NullPointerException if the argument null
+      */
+     void transfer(Collection<FlowFile> flowFiles);
+ 
+     /**
+      * Transfers the given FlowFile to the appropriate destination processor
+      * work queue(s) based on the given relationship. If the relationship leads
+      * to more than one destination the state of the FlowFile is replicated such
+      * that each destination receives an exact copy of the FlowFile though each
+      * will have its own unique identity. The destination processors will not be
+      * able to operate on the given FlowFile until this session is committed or
+      * until the ownership of the session is migrated to another processor. If
+      * ownership of the session is passed to a destination processor then that
+      * destination processor will have immediate visibility of the transferred
+      * FlowFiles within the session.
+      *
+      * @param flowFiles
+      * @param relationship
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws NullPointerException if the argument null
+      * @throws IllegalArgumentException if given relationship is not a known or
+      * registered relationship
+      */
+     void transfer(Collection<FlowFile> flowFiles, Relationship relationship);
+ 
+     /**
+      * Ends the managed persistence for the given FlowFile. The persistent
+      * attributes for the FlowFile are deleted and so is the content assuming
+      * nothing else references it and this FlowFile will no longer be available
+      * for further operation.
+      *
+      * @param flowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      */
+     void remove(FlowFile flowFile);
+ 
+     /**
+      * Ends the managed persistence for the given FlowFiles. The persistent
+      * attributes for the FlowFile are deleted and so is the content assuming
+      * nothing else references it and this FlowFile will no longer be available
+      * for further operation.
+      *
+      * @param flowFiles
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if any of the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      */
+     void remove(Collection<FlowFile> flowFiles);
+ 
+     /**
+      * Executes the given callback against the contents corresponding to the
+      * given FlowFile.
+      *
+      * @param source
+      * @param reader
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     void read(FlowFile source, InputStreamCallback reader);
+ 
+     /**
+      * Combines the content of all given source FlowFiles into a single given
+      * destination FlowFile.
+      *
+      * @param sources
+      * @param destination
+      * @return updated destination FlowFile (new size, etc...)
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws IllegalArgumentException if the given destination is contained
+      * within the sources
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content. The state of the destination will be as it was prior to
+      * this call.
+      */
+     FlowFile merge(Collection<FlowFile> sources, FlowFile destination);
+ 
+     /**
+      * Combines the content of all given source FlowFiles into a single given
+      * destination FlowFile.
+      *
+      * @param sources
+      * @param destination
+      * @param header bytes that will be added to the beginning of the merged
+      * output. May be null or empty.
+      * @param footer bytes that will be added to the end of the merged output.
+      * May be null or empty.
+      * @param demarcator bytes that will be placed in between each object merged
+      * together. May be null or empty.
+      * @return updated destination FlowFile (new size, etc...)
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws IllegalArgumentException if the given destination is contained
+      * within the sources
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content. The state of the destination will be as it was prior to
+      * this call.
+      */
+     FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator);
+ 
+     /**
+      * Executes the given callback against the content corresponding to the
+      * given FlowFile
+      *
+      * @param source
+      * @param writer
+      * @return updated FlowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be referenced, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     FlowFile write(FlowFile source, OutputStreamCallback writer);
+ 
+     /**
+      * Executes the given callback against the content corresponding to the
+      * given flow file
+      *
+      * @param source
+      * @param writer
+      * @return updated FlowFile
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     FlowFile write(FlowFile source, StreamCallback writer);
+ 
+     /**
+      * Executes the given callback against the content corresponding to the
+      * given FlowFile, such that any data written to the OutputStream of the
+      * content will be appended to the end of FlowFile.
+      *
+      * @param source
+      * @param writer
+      * @return
+      */
+     FlowFile append(FlowFile source, OutputStreamCallback writer);
+ 
+     /**
+      * Writes to the given FlowFile all content from the given content path.
+      *
+      * @param source the file from which content will be obtained
+      * @param keepSourceFile if true the content is simply copied; if false the
+      * original content might be used in a destructive way for efficiency such
+      * that the repository will have the data but the original data will be
+      * gone. If false the source object will be removed or gone once imported.
+      * It will not be restored if the session is rolled back so this must be
+      * used with caution. In some cases it can result in tremendous efficiency
+      * gains but is also dangerous.
+      * @param destination the FlowFile whose content will be updated
+      * @return the updated destination FlowFile (new size)
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination);
+ 
+     /**
+      * Writes to the given FlowFile all content from the given content path.
+      *
+      * @param source the file from which content will be obtained
+      * @param destination the FlowFile whose content will be updated
+      * @return the updated destination FlowFile (new size)
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     FlowFile importFrom(InputStream source, FlowFile destination);
+ 
+     /**
+      * Writes the content of the given FlowFile to the given destination path.
+      *
+      * @param flowFile
+      * @param destination
+      * @param append if true will append to the current content at the given
+      * path; if false will replace any current content
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     void exportTo(FlowFile flowFile, Path destination, boolean append);
+ 
+     /**
+      * Writes the content of the given FlowFile to the given destination stream
+      *
+      * @param flowFile
+      * @param destination
+      * @throws IllegalStateException if detected that this method is being
+      * called from within a callback of another method in this session and for
+      * the given FlowFile(s)
+      * @throws FlowFileHandlingException if the given FlowFile is already
+      * transferred or removed or doesn't belong to this session. Automatic
+      * rollback will occur.
+      * @throws MissingFlowFileException if the given FlowFile content cannot be
+      * found. The FlowFile should no longer be reference, will be internally
+      * destroyed, and the session is automatically rolled back and what is left
+      * of the FlowFile is destroyed.
+      * @throws FlowFileAccessException if some IO problem occurs accessing
+      * FlowFile content
+      */
+     void exportTo(FlowFile flowFile, OutputStream destination);
+ 
+     /**
+      * Returns a ProvenanceReporter that is tied to this ProcessSession.
+      *
+      * @return
+      */
+     ProvenanceReporter getProvenanceReporter();
+ }