You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/11/09 21:25:20 UTC

[1/2] nifi git commit: NIFI-2850 This closes #1115. Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it

Repository: nifi
Updated Branches:
  refs/heads/master cc2fbcaac -> c441a8696


http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 545559e..16bb911 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -77,7 +77,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.bin.Bin;
 import org.apache.nifi.processor.util.bin.BinFiles;
 import org.apache.nifi.processor.util.bin.BinManager;
-import org.apache.nifi.processor.util.FlowFileSessionWrapper;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
@@ -397,8 +396,7 @@ public class MergeContent extends BinFiles {
     }
 
     @Override
-    protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
-            final ProcessSession session) throws ProcessException {
+    protected boolean processBin(final Bin bin, final ProcessContext context) throws ProcessException {
 
         final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -439,63 +437,62 @@ public class MergeContent extends BinFiles {
                 break;
         }
 
+        final List<FlowFile> contents = bin.getContents();
+        final ProcessSession binSession = bin.getSession();
+
         if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
-            final String error = getDefragmentValidationError(binCopy);
+            final String error = getDefragmentValidationError(bin.getContents());
 
             // Fail the flow files and commit them
             if (error != null) {
-                final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
+                final String binDescription = contents.size() <= 10 ? contents.toString() : contents.size() + " FlowFiles";
                 getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
-                for (final FlowFileSessionWrapper wrapper : binCopy) {
-                    wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
-                    wrapper.getSession().commit();
-                }
+                binSession.transfer(contents, REL_FAILURE);
+                binSession.commit();
 
                 return true;
             }
-            Collections.sort(binCopy, new FragmentComparator());
+
+            Collections.sort(contents, new FragmentComparator());
         }
 
-        FlowFile bundle = merger.merge(context, session, binCopy);
+        FlowFile bundle = merger.merge(bin, context);
 
         // keep the filename, as it is added to the bundle.
         final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
 
         // merge all of the attributes
-        final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(binCopy);
+        final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(contents);
         bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
         // restore the filename of the bundle
         bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
-        bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
-        bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(unmodifiableBin.getBinAge()));
+        bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(contents.size()));
+        bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(bin.getBinAge()));
 
-        bundle = session.putAllAttributes(bundle, bundleAttributes);
+        bundle = binSession.putAllAttributes(bundle, bundleAttributes);
 
-        final String inputDescription = binCopy.size() < 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
+        final String inputDescription = contents.size() < 10 ? contents.toString() : contents.size() + " FlowFiles";
         getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
-        session.transfer(bundle, REL_MERGED);
+        binSession.transfer(bundle, REL_MERGED);
 
-        for (final FlowFileSessionWrapper unmerged : merger.getUnmergedFlowFiles()) {
-            final ProcessSession unmergedSession = unmerged.getSession();
-            final FlowFile unmergedCopy = unmergedSession.clone(unmerged.getFlowFile());
-            unmergedSession.transfer(unmergedCopy, REL_FAILURE);
+        for (final FlowFile unmerged : merger.getUnmergedFlowFiles()) {
+            final FlowFile unmergedCopy = binSession.clone(unmerged);
+            binSession.transfer(unmergedCopy, REL_FAILURE);
         }
 
         // We haven't committed anything, parent will take care of it
         return false;
     }
 
-    private String getDefragmentValidationError(final List<FlowFileSessionWrapper> bin) {
-        if (bin.isEmpty()) {
+    private String getDefragmentValidationError(final List<FlowFile> binContents) {
+        if (binContents.isEmpty()) {
             return null;
         }
 
         // If we are defragmenting, all fragments must have the appropriate attributes.
         String decidedFragmentCount = null;
         String fragmentIdentifier = null;
-        for (final FlowFileSessionWrapper flowFileWrapper : bin) {
-            final FlowFile flowFile = flowFileWrapper.getFlowFile();
-
+        for (final FlowFile flowFile : binContents) {
             final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
             if (!isNumber(fragmentIndex)) {
                 return "Cannot Defragment " + flowFile + " because it does not have an integer value for the " + FRAGMENT_INDEX_ATTRIBUTE + " attribute";
@@ -521,14 +518,14 @@ public class MergeContent extends BinFiles {
             return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the " + FRAGMENT_COUNT_ATTRIBUTE + " has a non-integer value of " + decidedFragmentCount;
         }
 
-        if (bin.size() < numericFragmentCount) {
+        if (binContents.size() < numericFragmentCount) {
             return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found only "
-                    + bin.size() + " fragments";
+                + binContents.size() + " fragments";
         }
 
-        if (bin.size() > numericFragmentCount) {
+        if (binContents.size() > numericFragmentCount) {
             return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found "
-                    + bin.size() + " fragments for this identifier";
+                + binContents.size() + " fragments for this identifier";
         }
 
         return null;
@@ -550,27 +547,25 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public FlowFile merge(final ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
-            final Set<FlowFile> parentFlowFiles = new HashSet<>();
-            for (final FlowFileSessionWrapper wrapper : wrappers) {
-                parentFlowFiles.add(wrapper.getFlowFile());
-            }
+        public FlowFile merge(final Bin bin, final ProcessContext context) {
+            final List<FlowFile> contents = bin.getContents();
 
-            FlowFile bundle = session.create(parentFlowFiles);
+            final ProcessSession session = bin.getSession();
+            FlowFile bundle = session.create(bin.getContents());
             final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null);
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException {
-                    final byte[] header = getDelimiterContent(context, wrappers, HEADER);
+                    final byte[] header = getDelimiterContent(context, contents, HEADER);
                     if (header != null) {
                         out.write(header);
                     }
 
                     boolean isFirst = true;
-                    final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator();
+                    final Iterator<FlowFile> itr = contents.iterator();
                     while (itr.hasNext()) {
-                        final FlowFileSessionWrapper wrapper = itr.next();
-                        wrapper.getSession().read(wrapper.getFlowFile(), false, new InputStreamCallback() {
+                        final FlowFile flowFile = itr.next();
+                        bin.getSession().read(flowFile, false, new InputStreamCallback() {
                             @Override
                             public void process(final InputStream in) throws IOException {
                                 StreamUtils.copy(in, out);
@@ -578,13 +573,13 @@ public class MergeContent extends BinFiles {
                         });
 
                         if (itr.hasNext()) {
-                            final byte[] demarcator = getDelimiterContent(context, wrappers, DEMARCATOR);
+                            final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR);
                             if (demarcator != null) {
                                 out.write(demarcator);
                             }
                         }
 
-                        final String flowFileMimeType = wrapper.getFlowFile().getAttribute(CoreAttributes.MIME_TYPE.key());
+                        final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
                         if (isFirst) {
                             bundleMimeTypeRef.set(flowFileMimeType);
                             isFirst = false;
@@ -595,15 +590,15 @@ public class MergeContent extends BinFiles {
                         }
                     }
 
-                    final byte[] footer = getDelimiterContent(context, wrappers, FOOTER);
+                    final byte[] footer = getDelimiterContent(context, contents, FOOTER);
                     if (footer != null) {
                         out.write(footer);
                     }
                 }
             });
 
-            session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(wrappers));
+            session.getProvenanceReporter().join(contents, bundle);
+            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents));
             if (bundleMimeTypeRef.get() != null) {
                 this.mimeType = bundleMimeTypeRef.get();
             }
@@ -611,8 +606,7 @@ public class MergeContent extends BinFiles {
             return bundle;
         }
 
-        private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
-                throws IOException {
+        private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFile> wrappers, final PropertyDescriptor descriptor) throws IOException {
             final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue();
             if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) {
                 return getDelimiterFileContent(context, wrappers, descriptor);
@@ -621,36 +615,30 @@ public class MergeContent extends BinFiles {
             }
         }
 
-        private byte[] getDelimiterFileContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
+        private byte[] getDelimiterFileContent(final ProcessContext context, final List<FlowFile> flowFiles, final PropertyDescriptor descriptor)
                 throws IOException {
             byte[] property = null;
             final String descriptorValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
-            if (descriptorValue != null && wrappers != null && wrappers.size() > 0) {
+            if (descriptorValue != null && flowFiles != null && flowFiles.size() > 0) {
                 final String content = new String(readContent(descriptorValue));
-                final FlowFileSessionWrapper wrapper = wrappers.get(0);
-                if (wrapper != null && content != null) {
-                    final FlowFile flowFile = wrapper.getFlowFile();
-                    if (flowFile != null) {
-                        final PropertyValue propVal = context.newPropertyValue(content).evaluateAttributeExpressions(flowFile);
-                        property = propVal.getValue().getBytes();
-                    }
+                final FlowFile flowFile = flowFiles.get(0);
+                if (flowFile != null && content != null) {
+                    final PropertyValue propVal = context.newPropertyValue(content).evaluateAttributeExpressions(flowFile);
+                    property = propVal.getValue().getBytes();
                 }
             }
             return property;
         }
 
-        private byte[] getDelimiterTextContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
+        private byte[] getDelimiterTextContent(final ProcessContext context, final List<FlowFile> flowFiles, final PropertyDescriptor descriptor)
                 throws IOException {
             byte[] property = null;
-            if (wrappers != null && wrappers.size() > 0) {
-                final FlowFileSessionWrapper wrapper = wrappers.get(0);
-                if (wrapper != null) {
-                    final FlowFile flowFile = wrapper.getFlowFile();
-                    if (flowFile != null) {
-                        final String value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
-                        if (value != null) {
-                            property = value.getBytes();
-                        }
+            if (flowFiles != null && flowFiles.size() > 0) {
+                final FlowFile flowFile = flowFiles.get(0);
+                if (flowFile != null) {
+                    final String value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
+                    if (value != null) {
+                        property = value.getBytes();
                     }
                 }
             }
@@ -663,18 +651,11 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+        public List<FlowFile> getUnmergedFlowFiles() {
             return Collections.emptyList();
         }
     }
 
-    private List<FlowFile> getFlowFiles(final List<FlowFileSessionWrapper> sessionWrappers) {
-        final List<FlowFile> flowFiles = new ArrayList<>();
-        for (final FlowFileSessionWrapper wrapper : sessionWrappers) {
-            flowFiles.add(wrapper.getFlowFile());
-        }
-        return flowFiles;
-    }
 
     private String getPath(final FlowFile flowFile) {
         Path path = Paths.get(flowFile.getAttribute(CoreAttributes.PATH.key()));
@@ -689,11 +670,11 @@ public class MergeContent extends BinFiles {
         return path == null ? "" : path.toString() + "/";
     }
 
-    private String createFilename(final List<FlowFileSessionWrapper> wrappers) {
-        if (wrappers.size() == 1) {
-            return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key());
+    private String createFilename(final List<FlowFile> flowFiles) {
+        if (flowFiles.size() == 1) {
+            return flowFiles.get(0).getAttribute(CoreAttributes.FILENAME.key());
         } else {
-            final FlowFile ff = wrappers.get(0).getFlowFile();
+            final FlowFile ff = flowFiles.get(0);
             final String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
             if (origFilename != null) {
                 return origFilename;
@@ -706,20 +687,21 @@ public class MergeContent extends BinFiles {
     private class TarMerge implements MergeBin {
 
         @Override
-        public FlowFile merge(final ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+        public FlowFile merge(final Bin bin, final ProcessContext context) {
+            final List<FlowFile> contents = bin.getContents();
+            final ProcessSession session = bin.getSession();
+
             final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
             FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
 
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".tar");
+            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar");
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream rawOut) throws IOException {
                     try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
                             final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
                         out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
-                        for (final FlowFileSessionWrapper wrapper : wrappers) {
-                            final FlowFile flowFile = wrapper.getFlowFile();
-
+                        for (final FlowFile flowFile : contents) {
                             final String path = keepPath ? getPath(flowFile) : "";
                             final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
 
@@ -737,14 +719,14 @@ public class MergeContent extends BinFiles {
 
                             out.putArchiveEntry(tarEntry);
 
-                            wrapper.getSession().exportTo(flowFile, out);
+                            bin.getSession().exportTo(flowFile, out);
                             out.closeArchiveEntry();
                         }
                     }
                 }
             });
 
-            session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
+            bin.getSession().getProvenanceReporter().join(contents, bundle);
             return bundle;
         }
 
@@ -754,7 +736,7 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+        public List<FlowFile> getUnmergedFlowFiles() {
             return Collections.emptyList();
         }
     }
@@ -770,8 +752,11 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public FlowFile merge(final ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
-            FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
+        public FlowFile merge(final Bin bin, final ProcessContext context) {
+            final ProcessSession session = bin.getSession();
+            final List<FlowFile> contents = bin.getContents();
+
+            FlowFile bundle = session.create(contents);
 
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
@@ -781,9 +766,8 @@ public class MergeContent extends BinFiles {
                         // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that.
                         final OutputStream out = new NonCloseableOutputStream(bufferedOut);
 
-                        for (final FlowFileSessionWrapper wrapper : wrappers) {
-                            final FlowFile flowFile = wrapper.getFlowFile();
-                            wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
+                        for (final FlowFile flowFile : contents) {
+                            bin.getSession().read(flowFile, false, new InputStreamCallback() {
                                 @Override
                                 public void process(final InputStream rawIn) throws IOException {
                                     try (final InputStream in = new BufferedInputStream(rawIn)) {
@@ -805,8 +789,8 @@ public class MergeContent extends BinFiles {
                 }
             });
 
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".pkg");
-            session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
+            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg");
+            session.getProvenanceReporter().join(contents, bundle);
             return bundle;
         }
 
@@ -816,7 +800,7 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+        public List<FlowFile> getUnmergedFlowFiles() {
             return Collections.emptyList();
         }
     }
@@ -830,28 +814,29 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public FlowFile merge(final ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+        public FlowFile merge(final Bin bin, final ProcessContext context) {
             final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
 
-            FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
+            final ProcessSession session = bin.getSession();
+            final List<FlowFile> contents = bin.getContents();
+
+            FlowFile bundle = session.create(contents);
 
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".zip");
+            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip");
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream rawOut) throws IOException {
                     try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
                             final ZipOutputStream out = new ZipOutputStream(bufferedOut)) {
                         out.setLevel(compressionLevel);
-                        for (final FlowFileSessionWrapper wrapper : wrappers) {
-                            final FlowFile flowFile = wrapper.getFlowFile();
-
+                        for (final FlowFile flowFile : contents) {
                             final String path = keepPath ? getPath(flowFile) : "";
                             final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
                             final ZipEntry zipEntry = new ZipEntry(entryName);
                             zipEntry.setSize(flowFile.getSize());
                             out.putNextEntry(zipEntry);
 
-                            wrapper.getSession().exportTo(flowFile, out);
+                            bin.getSession().exportTo(flowFile, out);
                             out.closeEntry();
                         }
 
@@ -861,7 +846,7 @@ public class MergeContent extends BinFiles {
                 }
             });
 
-            session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
+            session.getProvenanceReporter().join(contents, bundle);
             return bundle;
         }
 
@@ -871,17 +856,19 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+        public List<FlowFile> getUnmergedFlowFiles() {
             return Collections.emptyList();
         }
     }
 
     private class AvroMerge implements MergeBin {
 
-        private List<FlowFileSessionWrapper> unmerged = new ArrayList<>();
+        private List<FlowFile> unmerged = new ArrayList<>();
 
         @Override
-        public FlowFile merge(ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+        public FlowFile merge(final Bin bin, final ProcessContext context) {
+            final ProcessSession session = bin.getSession();
+            final List<FlowFile> contents = bin.getContents();
 
             final Map<String, byte[]> metadata = new TreeMap<>();
             final AtomicReference<Schema> schema = new AtomicReference<>(null);
@@ -889,14 +876,13 @@ public class MergeContent extends BinFiles {
             final DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
 
             // we don't pass the parents to the #create method because the parents belong to different sessions
-            FlowFile bundle = session.create();
+            FlowFile bundle = session.create(contents);
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream rawOut) throws IOException {
                     try (final OutputStream out = new BufferedOutputStream(rawOut)) {
-                        for (final FlowFileSessionWrapper wrapper : wrappers) {
-                            final FlowFile flowFile = wrapper.getFlowFile();
-                            wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
+                        for (final FlowFile flowFile : contents) {
+                            bin.getSession().read(flowFile, false, new InputStreamCallback() {
                                 @Override
                                 public void process(InputStream in) throws IOException {
                                     boolean canMerge = true;
@@ -925,7 +911,7 @@ public class MergeContent extends BinFiles {
                                                 getLogger().debug("Input file {} has different schema - {}, not merging",
                                                         new Object[]{flowFile.getId(), reader.getSchema().getName()});
                                                 canMerge = false;
-                                                unmerged.add(wrapper);
+                                                unmerged.add(flowFile);
                                             }
 
                                             // check that we're appending to the same metadata
@@ -937,7 +923,7 @@ public class MergeContent extends BinFiles {
                                                         getLogger().debug("Input file {} has different non-reserved metadata, not merging",
                                                                 new Object[]{flowFile.getId()});
                                                         canMerge = false;
-                                                        unmerged.add(wrapper);
+                                                        unmerged.add(flowFile);
                                                     }
                                                 }
                                             }
@@ -951,7 +937,7 @@ public class MergeContent extends BinFiles {
                                                 getLogger().debug("Input file {} has different codec, not merging",
                                                         new Object[]{flowFile.getId()});
                                                 canMerge = false;
-                                                unmerged.add(wrapper);
+                                                unmerged.add(flowFile);
                                             }
                                         }
 
@@ -970,7 +956,15 @@ public class MergeContent extends BinFiles {
                 }
             });
 
-            session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
+            final Collection<FlowFile> parents;
+            if (unmerged.isEmpty()) {
+                parents = contents;
+            } else {
+                parents = new HashSet<>(contents);
+                parents.removeAll(unmerged);
+            }
+
+            session.getProvenanceReporter().join(parents, bundle);
             return bundle;
         }
 
@@ -980,7 +974,7 @@ public class MergeContent extends BinFiles {
         }
 
         @Override
-        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+        public List<FlowFile> getUnmergedFlowFiles() {
             return unmerged;
         }
     }
@@ -988,13 +982,11 @@ public class MergeContent extends BinFiles {
     private static class KeepUniqueAttributeStrategy implements AttributeStrategy {
 
         @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFileSessionWrapper> flowFiles) {
+        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
             final Map<String, String> newAttributes = new HashMap<>();
             final Set<String> conflicting = new HashSet<>();
 
-            for (final FlowFileSessionWrapper wrapper : flowFiles) {
-                final FlowFile flowFile = wrapper.getFlowFile();
-
+            for (final FlowFile flowFile : flowFiles) {
                 for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
                     final String name = attributeEntry.getKey();
                     final String value = attributeEntry.getValue();
@@ -1021,29 +1013,29 @@ public class MergeContent extends BinFiles {
     private static class KeepCommonAttributeStrategy implements AttributeStrategy {
 
         @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFileSessionWrapper> flowFiles) {
+        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
             final Map<String, String> result = new HashMap<>();
 
             //trivial cases
             if (flowFiles == null || flowFiles.isEmpty()) {
                 return result;
             } else if (flowFiles.size() == 1) {
-                result.putAll(flowFiles.iterator().next().getFlowFile().getAttributes());
+                result.putAll(flowFiles.iterator().next().getAttributes());
             }
 
             /*
              * Start with the first attribute map and only put an entry to the
              * resultant map if it is common to every map.
              */
-            final Map<String, String> firstMap = flowFiles.iterator().next().getFlowFile().getAttributes();
+            final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
 
             outer:
             for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
                 final String key = mapEntry.getKey();
                 final String value = mapEntry.getValue();
 
-                for (final FlowFileSessionWrapper flowFileWrapper : flowFiles) {
-                    final Map<String, String> currMap = flowFileWrapper.getFlowFile().getAttributes();
+                for (final FlowFile flowFile : flowFiles) {
+                    final Map<String, String> currMap = flowFile.getAttributes();
                     final String curVal = currMap.get(key);
                     if (curVal == null || !curVal.equals(value)) {
                         continue outer;
@@ -1058,27 +1050,27 @@ public class MergeContent extends BinFiles {
         }
     }
 
-    private static class FragmentComparator implements Comparator<FlowFileSessionWrapper> {
+    private static class FragmentComparator implements Comparator<FlowFile> {
 
         @Override
-        public int compare(final FlowFileSessionWrapper o1, final FlowFileSessionWrapper o2) {
-            final int fragmentIndex1 = Integer.parseInt(o1.getFlowFile().getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
-            final int fragmentIndex2 = Integer.parseInt(o2.getFlowFile().getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
+        public int compare(final FlowFile o1, final FlowFile o2) {
+            final int fragmentIndex1 = Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
+            final int fragmentIndex2 = Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
             return Integer.compare(fragmentIndex1, fragmentIndex2);
         }
     }
 
     private interface MergeBin {
 
-        FlowFile merge(ProcessContext context, ProcessSession session, List<FlowFileSessionWrapper> flowFiles);
+        FlowFile merge(Bin bin, ProcessContext context);
 
         String getMergedContentType();
 
-        List<FlowFileSessionWrapper> getUnmergedFlowFiles();
+        List<FlowFile> getUnmergedFlowFiles();
     }
 
     private interface AttributeStrategy {
 
-        Map<String, String> getMergedAttributes(List<FlowFileSessionWrapper> flowFiles);
+        Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index e738759..3a6d07c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -397,9 +397,18 @@ public class TestMergeContent {
         runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
 
         attrs.remove("correlationId");
-        runner.enqueue(new byte[0], attrs);
 
         runner.clearTransferState();
+
+        // Run a single iteration but do not perform the @OnStopped action because
+        // we do not want to purge our Bin Manager. This causes some bins to get
+        // created. We then enqueue a FlowFile with no correlation id. We do it this
+        // way because if we just run a single iteration, then all FlowFiles will be
+        // pulled in at once, and we don't know if the first bin to be created will
+        // have 5 FlowFiles or 1 FlowFile, since this one that we are about to enqueue
+        // will be in a separate bin.
+        runner.run(1, false, true);
+        runner.enqueue(new byte[0], attrs);
         runner.run();
 
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);


[2/2] nifi git commit: NIFI-2850 This closes #1115. Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it

Posted by jo...@apache.org.
NIFI-2850 This closes #1115. Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c441a869
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c441a869
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c441a869

Branch: refs/heads/master
Commit: c441a8696d8dce47f263846cdf5ecac8506ba5d3
Parents: cc2fbca
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Sep 29 14:41:35 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 9 16:25:03 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/processor/ProcessSession.java   |  56 ++++-
 .../nifi/provenance/ProvenanceEventBuilder.java |  45 +++-
 .../StandardProvenanceEventRecord.java          |  77 +++++-
 .../processor/util/FlowFileSessionWrapper.java  |  44 ----
 .../org/apache/nifi/processor/util/bin/Bin.java |  20 +-
 .../nifi/processor/util/bin/BinFiles.java       |  90 +++----
 .../nifi/processor/util/bin/BinManager.java     |  84 ++++++-
 .../apache/nifi/util/MockProcessSession.java    | 118 +++++++--
 .../nifi/util/MockProvenanceReporter.java       |  12 +
 .../repository/BatchingSessionFactory.java      |   5 +
 .../repository/StandardProcessSession.java      | 248 +++++++++++++++---
 .../repository/StandardProvenanceReporter.java  |  12 +
 .../nifi/processor/StandardProcessContext.java  |   6 +-
 .../repository/TestStandardProcessSession.java  |  32 +++
 .../nifi/processors/hive/PutHiveStreaming.java  |  10 +-
 .../processors/hive/TestPutHiveStreaming.java   |   3 -
 .../nifi/processors/standard/MergeContent.java  | 252 +++++++++----------
 .../processors/standard/TestMergeContent.java   |  11 +-
 18 files changed, 800 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index 704d2cc..80fa6c0 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -43,11 +43,13 @@ import org.apache.nifi.provenance.ProvenanceReporter;
  * session is always tied to a single processor at any one time and ensures no
  * FlowFile can ever be accessed by any more than one processor at a given time.
  * The session also ensures that all FlowFiles are always accounted for. The
- * creator of a ProcessSession is always required to manage the session.</p>
+ * creator of a ProcessSession is always required to manage the session.
+ * </p>
  *
  * <p>
  * A session is not considered thread safe. The session supports a unit of work
- * that is either committed or rolled back</p>
+ * that is either committed or rolled back
+ * </p>
  *
  * <p>
  * As noted on specific methods and for specific exceptions automated rollback
@@ -55,12 +57,22 @@ import org.apache.nifi.provenance.ProvenanceReporter;
  * situations can result in exceptions yet not cause automated rollback. In
  * these cases the consistency of the repository will be retained but callers
  * will be able to indicate whether it should result in rollback or continue on
- * toward a commit.</p>
+ * toward a commit.
+ * </p>
  *
  * <p>
- * A process session instance may be used continuously. That is, after each
- * commit or rollback, the session can be used again.</p>
- *
+ * A process session has two 'terminal' methods that will result in the process session
+ * being in a 'fresh', containing no knowledge or any FlowFile, as if the session were newly
+ * created. After one of these methods is called, the instance may be used again. The terminal
+ * methods for a Process Session are the {@link #commit()} and {@link #rollback()}. Additionally,
+ * the {@link #migrate(ProcessSession, Collection)} method results in {@code this} containing
+ * no knowledge of any of the FlowFiles that are provided, as if the FlowFiles never existed in
+ * this ProcessSession. After each commit or rollback, the session can be used again. Note, however,
+ * that even if all FlowFiles are migrated via the {@link #migrate(ProcessSession, Collection)} method,
+ * this Process Session is not entirely cleared, as it still has knowledge of Counters that were adjusted
+ * via the {@link #adjustCounter(String, long, boolean)} method. A commit or rollback will clear these
+ * counters, as well.
+ * </p>
  */
 public interface ProcessSession {
 
@@ -109,6 +121,32 @@ public interface ProcessSession {
     void rollback(boolean penalize);
 
     /**
+     * <p>
+     * Migrates ownership of the given FlowFiles from {@code this} to the given {@code newOwner}.
+     * </p>
+     *
+     * <p>
+     * When calling this method, all of the following pre-conditions must be met:
+     * </p>
+     *
+     * <ul>
+     * <li>This method cannot be called from within a callback
+     * (see {@link #write(FlowFile, OutputStreamCallback)}, {@link #write(FlowFile, StreamCallback)},
+     * {@link #read(FlowFile, InputStreamCallback)}, {@link #read(FlowFile, boolean, InputStreamCallback)} for any of
+     * the given FlowFiles.</li>
+     * <li>No InputStream can be open for the content of any of the given FlowFiles (see {@link #read(FlowFile)}).</li>
+     * <li>Each of the FlowFiles provided must be the most up-to-date copy of the FlowFile.</li>
+     * <li>For any provided FlowFile, if the FlowFile has any child (e.g., by calling {@link #create(FlowFile)} and passing the FlowFile
+     * as the argument), then all children that were created must also be in the Collection of provided FlowFiles.</li>
+     * </ul>
+     *
+     * @param newOwner the ProcessSession that is to become the new owner of all FlowFiles
+     *            that currently belong to {@code this}.
+     * @param flowFiles the FlowFiles to migrate
+     */
+    void migrate(ProcessSession newOwner, Collection<FlowFile> flowFiles);
+
+    /**
      * Adjusts counter data for the given counter name and takes care of
      * registering the counter if not already present. The adjustment occurs
      * only if and when the ProcessSession is committed.
@@ -116,9 +154,9 @@ public interface ProcessSession {
      * @param name the name of the counter
      * @param delta the delta by which to modify the counter (+ or -)
      * @param immediate if true, the counter will be updated immediately,
-     * without regard to whether the ProcessSession is commit or rolled back;
-     * otherwise, the counter will be incremented only if and when the
-     * ProcessSession is committed.
+     *            without regard to whether the ProcessSession is commit or rolled back;
+     *            otherwise, the counter will be incremented only if and when the
+     *            ProcessSession is committed.
      */
     void adjustCounter(String name, long delta, boolean immediate);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java
index d0e20d1..38e39a2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.provenance;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.flowfile.FlowFile;
@@ -180,11 +181,11 @@ public interface ProvenanceEventBuilder {
     ProvenanceEventBuilder setTransitUri(String transitUri);
 
     /**
-     * Adds the given FlowFile as a parent for Events of type {@link ProvenanceEventType#SPAWN},
+     * Adds the given FlowFile as a parent for Events of type,
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE}
      *
-     * This is valid only for {@link ProvenanceEventType#SPAWN},
+     * This is valid only for
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE} events and will be ignored for any
      * other event types.
@@ -195,11 +196,11 @@ public interface ProvenanceEventBuilder {
     ProvenanceEventBuilder addParentFlowFile(FlowFile parent);
 
     /**
-     * Removes the given FlowFile as a parent for Events of type {@link ProvenanceEventType#SPAWN},
+     * Removes the given FlowFile as a parent for Events of type,
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE}
      *
-     * This is valid only for {@link ProvenanceEventType#SPAWN},
+     * This is valid only for
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE} events and will be ignored for any
      * other event types.
@@ -210,11 +211,11 @@ public interface ProvenanceEventBuilder {
     ProvenanceEventBuilder removeParentFlowFile(FlowFile parent);
 
     /**
-     * Adds the given FlowFile as a child for Events of type {@link ProvenanceEventType#SPAWN},
+     * Adds the given FlowFile as a child for Events of type
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE}
      *
-     * This is valid only for {@link ProvenanceEventType#SPAWN},
+     * This is valid only for
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE} events and will be ignored for any
      * other event types.
@@ -225,11 +226,20 @@ public interface ProvenanceEventBuilder {
     ProvenanceEventBuilder addChildFlowFile(FlowFile child);
 
     /**
-     * Removes the given FlowFile as a child for Events of type {@link ProvenanceEventType#SPAWN},
+     * Adds the given FlowFile identifier as a child for Events of type
+     * {@link ProvenanceEventType#FORK} and {@link ProvenanceEventType#CLONE}
+     *
+     * @param childId the ID of the FlowFile that is a child
+     * @return the builder
+     */
+    ProvenanceEventBuilder addChildFlowFile(String childId);
+
+    /**
+     * Removes the given FlowFile as a child for Events of type
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE}
      *
-     * This is valid only for {@link ProvenanceEventType#SPAWN},
+     * This is valid only for
      * {@link ProvenanceEventType#FORK}, {@link ProvenanceEventType#JOIN}, and
      * {@link ProvenanceEventType#CLONE} events and will be ignored for any
      * other event types.
@@ -295,4 +305,23 @@ public interface ProvenanceEventBuilder {
      */
     ProvenanceEventRecord build();
 
+    /**
+     * @return the ids of all FlowFiles that have been added as children via {@link #addChildFlowFile(FlowFile)}
+     */
+    List<String> getChildFlowFileIds();
+
+    /**
+     * @return the ids of all FlowFiles that have been added as parents via {@link #addParentFlowFile(FlowFile)}
+     */
+    List<String> getParentFlowFileIds();
+
+    /**
+     * @return the id of the FlowFile for which the event is being built
+     */
+    String getFlowFileId();
+
+    /**
+     * @return a new Provenance Event Builder that is identical to this one (a deep copy)
+     */
+    ProvenanceEventBuilder copy();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
index 34de366..cafdc97 100644
--- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
+++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
@@ -411,7 +411,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
         private String uuid = null;
         private List<String> parentUuids = null;
         private List<String> childrenUuids = null;
-        private String contentType = null;
         private String alternateIdentifierUri = null;
         private String details = null;
         private String relationship = null;
@@ -480,6 +479,55 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
         }
 
         @Override
+        public ProvenanceEventBuilder copy() {
+            final Builder copy = new Builder();
+            copy.eventTime = eventTime;
+            copy.entryDate = entryDate;
+            copy.lineageStartDate = lineageStartDate;
+            copy.eventType = eventType;
+            copy.componentId = componentId;
+            copy.componentType = componentType;
+            copy.transitUri = transitUri;
+            copy.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier;
+            copy.uuid = uuid;
+            if (parentUuids != null) {
+                copy.parentUuids = new ArrayList<>(parentUuids);
+            }
+            if (childrenUuids != null) {
+                copy.childrenUuids = new ArrayList<>(childrenUuids);
+            }
+            copy.alternateIdentifierUri = alternateIdentifierUri;
+            copy.eventDuration = eventDuration;
+            if (previousAttributes != null) {
+                copy.previousAttributes = new HashMap<>(previousAttributes);
+            }
+            if (updatedAttributes != null) {
+                copy.updatedAttributes = new HashMap<>(updatedAttributes);
+            }
+            copy.details = details;
+            copy.relationship = relationship;
+
+            copy.contentClaimContainer = contentClaimContainer;
+            copy.contentClaimSection = contentClaimSection;
+            copy.contentClaimIdentifier = contentClaimIdentifier;
+            copy.contentClaimOffset = contentClaimOffset;
+            copy.contentSize = contentSize;
+
+            copy.previousClaimContainer = previousClaimContainer;
+            copy.previousClaimSection = previousClaimSection;
+            copy.previousClaimIdentifier = previousClaimIdentifier;
+            copy.previousClaimOffset = previousClaimOffset;
+            copy.previousSize = previousSize;
+
+            copy.sourceQueueIdentifier = sourceQueueIdentifier;
+            copy.storageByteOffset = storageByteOffset;
+            copy.storageFilename = storageFilename;
+
+            return copy;
+        }
+
+
+        @Override
         public Builder setFlowFileEntryDate(final long entryDate) {
             this.entryDate = entryDate;
             return this;
@@ -581,10 +629,15 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
 
         @Override
         public Builder addChildFlowFile(final FlowFile childFlowFile) {
+            return addChildFlowFile(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
+        }
+
+        @Override
+        public Builder addChildFlowFile(final String childId) {
             if (this.childrenUuids == null) {
                 this.childrenUuids = new ArrayList<>();
             }
-            this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
+            this.childrenUuids.add(childId);
             return this;
         }
 
@@ -606,11 +659,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
             return this;
         }
 
-        public Builder setContentType(String contentType) {
-            this.contentType = contentType;
-            return this;
-        }
-
         @Override
         public Builder setAlternateIdentifierUri(String alternateIdentifierUri) {
             this.alternateIdentifierUri = alternateIdentifierUri;
@@ -723,5 +771,20 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
 
             return new StandardProvenanceEventRecord(this);
         }
+
+        @Override
+        public List<String> getChildFlowFileIds() {
+            return childrenUuids;
+        }
+
+        @Override
+        public List<String> getParentFlowFileIds() {
+            return parentUuids;
+        }
+
+        @Override
+        public String getFlowFileId() {
+            return uuid;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java
deleted file mode 100644
index e2f17b3..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-
-public class FlowFileSessionWrapper {
-
-    private final FlowFile flowFile;
-    private final ProcessSession session;
-
-    public FlowFileSessionWrapper(final FlowFile flowFile, final ProcessSession session) {
-        this.flowFile = flowFile;
-        this.session = session;
-    }
-
-    public FlowFile getFlowFile() {
-        return flowFile;
-    }
-
-    public ProcessSession getSession() {
-        return session;
-    }
-
-    @Override
-    public String toString() {
-        return flowFile.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index 35c225b..fdbc71f 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -17,20 +17,20 @@
 package org.apache.nifi.processor.util.bin;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.FlowFileSessionWrapper;
 
 /**
  * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a {@code Bin}, the caller must synchronize
  * access.
  */
 public class Bin {
-
+    private final ProcessSession session;
     private final long creationMomentEpochNs;
     private final long minimumSizeBytes;
     private final long maximumSizeBytes;
@@ -39,13 +39,14 @@ public class Bin {
     private volatile int maximumEntries = Integer.MAX_VALUE;
     private final String fileCountAttribute;
 
-    final List<FlowFileSessionWrapper> binContents = new ArrayList<>();
+    final List<FlowFile> binContents = new ArrayList<>();
     long size;
     int successiveFailedOfferings = 0;
 
     /**
      * Constructs a new bin
      *
+     * @param session the session
      * @param minSizeBytes min bytes
      * @param maxSizeBytes max bytes
      * @param minEntries min entries
@@ -53,7 +54,8 @@ public class Bin {
      * @param fileCountAttribute num files
      * @throws IllegalArgumentException if the min is not less than or equal to the max.
      */
-    public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
+    public Bin(final ProcessSession session, final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
+        this.session = session;
         this.minimumSizeBytes = minSizeBytes;
         this.maximumSizeBytes = maxSizeBytes;
         this.minimumEntries = minEntries;
@@ -66,6 +68,10 @@ public class Bin {
         }
     }
 
+    public ProcessSession getSession() {
+        return session;
+    }
+
     /**
      * Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of
      * successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing)
@@ -132,7 +138,9 @@ public class Bin {
         }
 
         size += flowFile.getSize();
-        binContents.add(new FlowFileSessionWrapper(flowFile, session));
+
+        session.migrate(getSession(), Collections.singleton(flowFile));
+        binContents.add(flowFile);
         successiveFailedOfferings = 0;
         return true;
     }
@@ -157,7 +165,7 @@ public class Bin {
     /**
      * @return the underlying list of flow files within this bin
      */
-    public List<FlowFileSessionWrapper> getContents() {
+    public List<FlowFile> getContents() {
         return binContents;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index ea64d87..2672491 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -19,8 +19,11 @@ package org.apache.nifi.processor.util.bin;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -38,7 +41,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.FlowFileSessionWrapper;
 import org.apache.nifi.processor.util.StandardValidators;
 
 /**
@@ -110,9 +112,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 
         Bin bin;
         while ((bin = readyBins.poll()) != null) {
-            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
-                wrapper.getSession().rollback();
-            }
+            bin.getSession().rollback();
         }
     }
 
@@ -146,17 +146,17 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     /**
      * Processes a single bin. Implementing class is responsible for committing each session
      *
-     * @param unmodifiableBin A reference to a single bin of flow file/session wrappers
-     * @param binContents A copy of the contents of the bin
+     * @param unmodifiableBin A reference to a single bin of flow files
      * @param context The context
-     * @param session The session that created the bin
-     * @return Return true if the input bin was already committed. E.g., in case of a failure, the implementation may choose to transfer all binned files to Failure and commit their sessions. If
-     * false, the processBins() method will transfer the files to Original and commit the sessions
+     * @return <code>true</code> if the input bin was already committed. E.g., in case of a failure, the implementation
+     *         may choose to transfer all binned files to Failure and commit their sessions. If
+     *         false, the processBins() method will transfer the files to Original and commit the sessions
      *
-     * @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin will be transferred to failure and the ProcessSession provided by the 'session'
-     * argument rolled back
+     * @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin
+     *             will be transferred to failure and the ProcessSession provided by the 'session'
+     *             argument rolled back
      */
-    protected abstract boolean processBin(Bin unmodifiableBin, List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
+    protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext context) throws ProcessException;
 
     /**
      * Allows additional custom validation to be done. This will be called from the parent's customValidation method.
@@ -188,7 +188,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         }
 
         final int binsMigrated = migrateBins(context);
-        final int binsProcessed = processBins(context, sessionFactory);
+        final int binsProcessed = processBins(context);
         //If we accomplished nothing then let's yield
         if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
             context.yield();
@@ -215,7 +215,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         return added;
     }
 
-    private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+    private int processBins(final ProcessContext context) {
         final Bin bin = readyBins.poll();
         if (bin == null) {
             return 0;
@@ -225,42 +225,31 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         bins.add(bin);
 
         final ComponentLog logger = getLogger();
-        final ProcessSession session = sessionFactory.createSession();
-
-        final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
 
         boolean binAlreadyCommitted = false;
         try {
-            binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
+            binAlreadyCommitted = this.processBin(bin, context);
         } catch (final ProcessException e) {
-            logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
+            logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e});
 
-            for (final FlowFileSessionWrapper wrapper : binCopy) {
-                wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
-                wrapper.getSession().commit();
+            final ProcessSession binSession = bin.getSession();
+            for (final FlowFile flowFile : bin.getContents()) {
+                binSession.transfer(flowFile, REL_FAILURE);
             }
-            session.rollback();
+            binSession.commit();
             return 1;
         } catch (final Exception e) {
-            logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {binCopy.size(), e});
+            logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
 
-            for (final FlowFileSessionWrapper wrapper : binCopy) {
-                wrapper.getSession().rollback();
-            }
-            session.rollback();
+            bin.getSession().rollback();
             return 1;
         }
 
-        // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
-        // between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
-        // across multiple sessions, we cannot guarantee atomicity across the sessions
-        session.commit();
         // If this bin's session has been committed, move on.
         if (!binAlreadyCommitted) {
-            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
-                wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
-                wrapper.getSession().commit();
-            }
+            final ProcessSession binSession = bin.getSession();
+            binSession.transfer(bin.getContents(), REL_ORIGINAL);
+            binSession.commit();
         }
 
         return 1;
@@ -274,25 +263,26 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
             }
 
             final ProcessSession session = sessionFactory.createSession();
-            FlowFile flowFile = session.get();
-            if (flowFile == null) {
+            final List<FlowFile> flowFiles = session.get(1000);
+            if (flowFiles.isEmpty()) {
                 break;
             }
 
-            flowFile = this.preprocessFlowFile(context, session, flowFile);
-
-            String groupId = this.getGroupId(context, flowFile);
-
-            final boolean binned = binManager.offer(groupId, flowFile, session);
-
-            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
-            if (!binned) {
-                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
-                bin.offer(flowFile, session);
-                this.readyBins.add(bin);
+            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
+            for (FlowFile flowFile : flowFiles) {
+                flowFile = this.preprocessFlowFile(context, session, flowFile);
+                final String groupingIdentifier = getGroupId(context, flowFile);
+                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
             }
 
-            flowFilesBinned++;
+            for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
+                final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+                for (final FlowFile flowFile : unbinned) {
+                    Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
+                    bin.offer(flowFile, session);
+                    this.readyBins.add(bin);
+                }
+            }
         }
 
         return flowFilesBinned;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index 6caaebb..d6a8567 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -19,8 +19,10 @@ package org.apache.nifi.processor.util.bin;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -30,7 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.FlowFileSessionWrapper;
+import org.apache.nifi.processor.ProcessSessionFactory;
 
 /**
  * This class is thread safe
@@ -60,9 +62,7 @@ public class BinManager {
         try {
             for (final List<Bin> binList : groupBinMap.values()) {
                 for (final Bin bin : binList) {
-                    for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
-                        wrapper.getSession().rollback();
-                    }
+                    bin.getSession().rollback();
                 }
             }
             groupBinMap.clear();
@@ -108,12 +108,15 @@ public class BinManager {
     /**
      * Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
      * <p/>
+     *
      * @param groupIdentifier the group to which the flow file belongs; can be null
      * @param flowFile the flow file to bin
      * @param session the ProcessSession to which the FlowFile belongs
+     * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to
+     *            create a new bin if necessary
      * @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria
      */
-    public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session) {
+    public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session, final ProcessSessionFactory sessionFactory) {
         final long currentMaxSizeBytes = maxSizeBytes.get();
         if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
             return false;
@@ -123,7 +126,8 @@ public class BinManager {
             final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
             if (currentBins == null) { // this is a new group we need to register
                 final List<Bin> bins = new ArrayList<>();
-                final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
+                final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                    maxEntries.get(), fileCountAttribute.get());
                 bins.add(bin);
                 groupBinMap.put(groupIdentifier, bins);
                 binCount++;
@@ -137,7 +141,8 @@ public class BinManager {
                 }
 
                 //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
-                final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
+                final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                    maxEntries.get(), fileCountAttribute.get());
                 currentBins.add(bin);
                 binCount++;
                 return bin.offer(flowFile, session);
@@ -148,6 +153,71 @@ public class BinManager {
     }
 
     /**
+     * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+     * <p/>
+     *
+     * @param groupIdentifier the group to which the flow file belongs; can be null
+     * @param flowFiles the flow files to bin
+     * @param session the ProcessSession to which the FlowFiles belong
+     * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to
+     *            create a new bin if necessary
+     * @return all of the FlowFiles that could not be successfully binned
+     */
+    public Set<FlowFile> offer(final String groupIdentifier, final Collection<FlowFile> flowFiles, final ProcessSession session, final ProcessSessionFactory sessionFactory) {
+        final long currentMaxSizeBytes = maxSizeBytes.get();
+        final Set<FlowFile> unbinned = new HashSet<>();
+
+        wLock.lock();
+        try {
+            flowFileLoop: for (final FlowFile flowFile : flowFiles) {
+                if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
+                    unbinned.add(flowFile);
+                    continue;
+                }
+
+                final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
+                if (currentBins == null) { // this is a new group we need to register
+                    final List<Bin> bins = new ArrayList<>();
+                    final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                        maxEntries.get(), fileCountAttribute.get());
+                    bins.add(bin);
+                    groupBinMap.put(groupIdentifier, bins);
+                    binCount++;
+
+                    final boolean added = bin.offer(flowFile, session);
+                    if (!added) {
+                        unbinned.add(flowFile);
+                    }
+                    continue;
+                } else {
+                    for (final Bin bin : currentBins) {
+                        final boolean accepted = bin.offer(flowFile, session);
+                        if (accepted) {
+                            continue flowFileLoop;
+                        }
+                    }
+
+                    //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
+                    final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                        maxEntries.get(), fileCountAttribute.get());
+                    currentBins.add(bin);
+                    binCount++;
+                    final boolean added = bin.offer(flowFile, session);
+                    if (!added) {
+                        unbinned.add(flowFile);
+                    }
+
+                    continue;
+                }
+            }
+        } finally {
+            wLock.unlock();
+        }
+
+        return unbinned;
+    }
+
+    /**
      * Finds all bins that are considered full and removes them from the manager.
      * <p/>
      * @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index dbac129..3228e1f 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -39,6 +39,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
@@ -68,14 +69,15 @@ public class MockProcessSession implements ProcessSession {
     private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
     private final SharedSessionState sharedState;
     private final Map<String, Long> counterMap = new HashMap<>();
+    private final Set<FlowFile> recursionSet = new HashSet<>();
     private final MockProvenanceReporter provenanceReporter;
 
     // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed.
-    private final List<InputStream> openInputStreams = new ArrayList<>();
+    private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
 
     private boolean committed = false;
     private boolean rolledback = false;
-    private int removedCount = 0;
+    private final Set<Long> removedFlowFiles = new HashSet<>();
 
     public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
         this.processor = processor;
@@ -103,6 +105,77 @@ public class MockProcessSession implements ProcessSession {
     }
 
     @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
+        if (Objects.requireNonNull(newOwner) == this) {
+            throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
+        }
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
+        }
+
+        if (!(newOwner instanceof MockProcessSession)) {
+            throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a session of type " + newOwner.getClass());
+        }
+
+        migrate((MockProcessSession) newOwner, (Collection<MockFlowFile>) (Collection) flowFiles);
+    }
+
+    private void migrate(final MockProcessSession newOwner, final Collection<MockFlowFile> flowFiles) {
+        for (final FlowFile flowFile : flowFiles) {
+            if (openInputStreams.containsKey(flowFile)) {
+                throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+                    + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+            }
+
+            if (recursionSet.contains(flowFile)) {
+                throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
+            }
+
+            ensureCurrentVersion(flowFile);
+        }
+
+        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
+            final Relationship relationship = entry.getKey();
+            final List<MockFlowFile> transferredFlowFiles = entry.getValue();
+
+            for (final MockFlowFile flowFile : flowFiles) {
+                if (transferredFlowFiles.remove(flowFile)) {
+                    newOwner.transferMap.computeIfAbsent(relationship, rel -> new ArrayList<>()).add(flowFile);
+                }
+            }
+        }
+
+        for (final MockFlowFile flowFile : flowFiles) {
+            if (beingProcessed.remove(flowFile.getId())) {
+                newOwner.beingProcessed.add(flowFile.getId());
+            }
+
+            if (penalized.remove(flowFile)) {
+                newOwner.penalized.add(flowFile);
+            }
+
+            if (currentVersions.containsKey(flowFile.getId())) {
+                newOwner.currentVersions.put(flowFile.getId(), currentVersions.remove(flowFile.getId()));
+            }
+
+            if (originalVersions.containsKey(flowFile.getId())) {
+                newOwner.originalVersions.put(flowFile.getId(), originalVersions.remove(flowFile.getId()));
+            }
+
+            if (removedFlowFiles.remove(flowFile.getId())) {
+                newOwner.removedFlowFiles.add(flowFile.getId());
+            }
+        }
+
+        final Set<String> flowFileIds = flowFiles.stream()
+            .map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
+            .collect(Collectors.toSet());
+
+        provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
+    }
+
+    @Override
     public MockFlowFile clone(final FlowFile flowFile) {
         validateState(flowFile);
         final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
@@ -134,7 +207,7 @@ public class MockProcessSession implements ProcessSession {
         }
 
         if (!openInputStreams.isEmpty()) {
-            final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
+            final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
             for (final InputStream openInputStream : openStreamCopy) {
                 try {
                     openInputStream.close();
@@ -436,6 +509,7 @@ public class MockProcessSession implements ProcessSession {
         final MockFlowFile mock = (MockFlowFile) flowFile;
 
         final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
+        recursionSet.add(flowFile);
         try {
             callback.process(bais);
             if(!allowSessionStreamManagement){
@@ -443,6 +517,8 @@ public class MockProcessSession implements ProcessSession {
             }
         } catch (final IOException e) {
             throw new ProcessException(e.toString(), e);
+        } finally {
+            recursionSet.remove(flowFile);
         }
     }
 
@@ -472,7 +548,7 @@ public class MockProcessSession implements ProcessSession {
 
             @Override
             public void close() throws IOException {
-                openInputStreams.remove(this);
+                openInputStreams.remove(flowFile);
                 bais.close();
             }
 
@@ -482,7 +558,7 @@ public class MockProcessSession implements ProcessSession {
             }
         };
 
-        openInputStreams.add(errorHandlingStream);
+        openInputStreams.put(flowFile, errorHandlingStream);
         return errorHandlingStream;
     }
 
@@ -506,7 +582,7 @@ public class MockProcessSession implements ProcessSession {
             if (ffId != null && ffId.equals(flowFile.getId())) {
                 processedItr.remove();
                 beingProcessed.remove(ffId);
-                removedCount++;
+                removedFlowFiles.add(flowFile.getId());
                 currentVersions.remove(ffId);
                 return;
             }
@@ -594,7 +670,7 @@ public class MockProcessSession implements ProcessSession {
         if(committed){
             return;
         }
-        final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
+        final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
         for (final InputStream openInputStream : openStreamCopy) {
             try {
                 openInputStream.close();
@@ -709,10 +785,13 @@ public class MockProcessSession implements ProcessSession {
         final MockFlowFile mock = (MockFlowFile) flowFile;
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        recursionSet.add(flowFile);
         try {
             callback.process(baos);
         } catch (final IOException e) {
             throw new ProcessException(e.toString(), e);
+        } finally {
+            recursionSet.remove(flowFile);
         }
 
         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@@ -761,10 +840,14 @@ public class MockProcessSession implements ProcessSession {
 
         final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
         final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        recursionSet.add(flowFile);
         try {
             callback.process(in, out);
         } catch (final IOException e) {
             throw new ProcessException(e.toString(), e);
+        } finally {
+            recursionSet.remove(flowFile);
         }
 
         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@@ -861,6 +944,19 @@ public class MockProcessSession implements ProcessSession {
 
     private void validateState(final FlowFile flowFile) {
         Objects.requireNonNull(flowFile);
+        ensureCurrentVersion(flowFile);
+        if (recursionSet.contains(flowFile)) {
+            throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
+        }
+
+        for (final List<MockFlowFile> flowFiles : transferMap.values()) {
+            if (flowFiles.contains(flowFile)) {
+                throw new IllegalStateException(flowFile + " has already been transferred");
+            }
+        }
+    }
+
+    private void ensureCurrentVersion(final FlowFile flowFile) {
         final FlowFile currentVersion = currentVersions.get(flowFile.getId());
         if (currentVersion == null) {
             throw new FlowFileHandlingException(flowFile + " is not known in this session");
@@ -869,12 +965,6 @@ public class MockProcessSession implements ProcessSession {
         if (currentVersion != flowFile) {
             throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session");
         }
-
-        for (final List<MockFlowFile> flowFiles : transferMap.values()) {
-            if (flowFiles.contains(flowFile)) {
-                throw new IllegalStateException(flowFile + " has already been transferred");
-            }
-        }
     }
 
     /**
@@ -1133,7 +1223,7 @@ public class MockProcessSession implements ProcessSession {
      * @return the number of FlowFiles that were removed
      */
     public int getRemovedCount() {
-        return removedCount;
+        return removedFlowFiles.size();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
index 8458715..8dc06f1 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -71,6 +71,18 @@ public class MockProvenanceReporter implements ProvenanceReporter {
         events.clear();
     }
 
+    void migrate(final MockProvenanceReporter newOwner, final Set<String> flowFileIds) {
+        final Set<ProvenanceEventRecord> toMove = new LinkedHashSet<>();
+        for (final ProvenanceEventRecord event : events) {
+            if (flowFileIds.contains(event.getFlowFileUuid())) {
+                toMove.add(event);
+            }
+        }
+
+        events.removeAll(toMove);
+        newOwner.events.addAll(toMove);
+    }
+
     /**
      * Generates a Fork event for the given child and parents but does not
      * register the event. This is useful so that a ProcessSession has the

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index 63b89eb..90c9dc7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -73,6 +73,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
         }
 
         @Override
+        public void migrate(ProcessSession newOwner, Collection<FlowFile> flowFiles) {
+            session.migrate(newOwner, flowFiles);
+        }
+
+        @Override
         public void adjustCounter(String name, long delta, boolean immediate) {
             session.adjustCounter(name, delta, immediate);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f851fdd..a2a68d5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -33,11 +33,13 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.connectable.Connectable;
@@ -102,13 +104,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
 
     private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
-    private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
-    private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
+    private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
+    private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
     private final Map<String, Long> counters = new HashMap<>();
     private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
     private final ProcessContext context;
     private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
-    private final Set<Path> deleteOnCommit = new HashSet<>();
+    private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
     private final long sessionId;
     private final String connectableDescription;
 
@@ -129,7 +131,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private long processingStartTime;
 
     // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
-    private final List<InputStream> openInputStreams = new ArrayList<>();
+    private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
 
     // maps a FlowFile to all Provenance Events that were generated for that FlowFile.
     // we do this so that if we generate a Fork event, for example, and then remove the event in the same
@@ -184,11 +186,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     public void checkpoint() {
+
         resetWriteClaims(false);
 
-        final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
-        for (final InputStream openStream : openStreamCopy) {
-            LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, this.connectableDescription);
+        final Map<FlowFile, InputStream> openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
+        for (final Map.Entry<FlowFile, InputStream> entry : openStreamCopy.entrySet()) {
+            final FlowFile flowFile = entry.getKey();
+            final InputStream openStream = entry.getValue();
+
+            LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
 
             try {
                 openStream.close();
@@ -397,7 +403,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
 
             // Delete any files from disk that need to be removed.
-            for (final Path path : checkpoint.deleteOnCommit) {
+            for (final Path path : checkpoint.deleteOnCommit.values()) {
                 try {
                     Files.deleteIfExists(path);
                 } catch (final IOException e) {
@@ -884,9 +890,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         deleteOnCommit.clear();
 
-        final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
-        for (final InputStream openStream : openStreamCopy) {
-            LOG.debug("{} closing {} for {} due to session rollback", this, openStream, this.connectableDescription);
+        final Map<FlowFile, InputStream> openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
+        for (final Map.Entry<FlowFile, InputStream> entry : openStreamCopy.entrySet()) {
+            final FlowFile flowFile = entry.getKey();
+            final InputStream openStream = entry.getValue();
+
+            LOG.debug("{} closing {} for {} due to session rollback", this, openStream, flowFile);
             try {
                 openStream.close();
             } catch (final Exception e) {
@@ -905,15 +914,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         }
 
+        resetWriteClaims();
+        resetReadClaim();
+
         if (recordsToHandle.isEmpty()) {
             LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this);
             acknowledgeRecords();
+            resetState();
             return;
         }
 
-        resetWriteClaims();
-        resetReadClaim();
-
         for (final StandardRepositoryRecord record : recordsToHandle) {
             // remove the working claims if they are different than the originals.
             removeTemporaryClaim(record);
@@ -1069,12 +1079,170 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void acknowledgeRecords() {
-        for (final Map.Entry<Connection, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) {
-            entry.getKey().getFlowFileQueue().acknowledge(entry.getValue());
+        for (final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) {
+            entry.getKey().acknowledge(entry.getValue());
         }
         unacknowledgedFlowFiles.clear();
     }
 
+    @Override
+    public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
+        if (Objects.requireNonNull(newOwner) == this) {
+            throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
+        }
+
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
+        }
+
+        if (!(newOwner instanceof StandardProcessSession)) {
+            throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a " + newOwner.getClass());
+        }
+
+        migrate((StandardProcessSession) newOwner, flowFiles);
+    }
+
+    private void migrate(final StandardProcessSession newOwner, final Collection<FlowFile> flowFiles) {
+        // We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc.
+        for (final FlowFile flowFile : flowFiles) {
+            if (openInputStreams.containsKey(flowFile)) {
+                throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+                    + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+            }
+
+            if (recursionSet.contains(flowFile)) {
+                throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
+            }
+            final StandardRepositoryRecord record = records.get(flowFile);
+            if (record == null) {
+                throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
+            }
+            if (record.getCurrent() != flowFile) {
+                throw new FlowFileHandlingException(flowFile + " is not the most recent version of this FlowFile within this session (" + toString() + ")");
+            }
+        }
+
+        // If we have a FORK event for one of the given FlowFiles, then all children must also be migrated. Otherwise, we
+        // could have a case where we have FlowFile A transferred and eventually exiting the flow and later the 'newOwner'
+        // ProcessSession is committed, claiming to have created FlowFiles from the parent, which is no longer even in
+        // the flow. This would be very confusing when looking at the provenance for the FlowFile, so it is best to avoid this.
+        final Set<String> flowFileIds = flowFiles.stream()
+            .map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
+            .collect(Collectors.toSet());
+
+        for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
+            final FlowFile eventFlowFile = entry.getKey();
+            if (flowFiles.contains(eventFlowFile)) {
+                final ProvenanceEventBuilder eventBuilder = entry.getValue();
+                for (final String childId : eventBuilder.getChildFlowFileIds()) {
+                    if (!flowFileIds.contains(childId)) {
+                        throw new IllegalStateException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size()
+                            + " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
+                    }
+                }
+            }
+        }
+
+        // If we have a FORK event where a FlowFile is a child of the FORK event, we want to create a FORK
+        // event builder for the new owner of the FlowFile and remove the child from our fork event builder.
+        for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
+            final FlowFile eventFlowFile = entry.getKey();
+            final ProvenanceEventBuilder eventBuilder = entry.getValue();
+
+            final Set<String> childrenIds = new HashSet<>(eventBuilder.getChildFlowFileIds());
+
+            ProvenanceEventBuilder copy = null;
+            for (final FlowFile flowFile : flowFiles) {
+                final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
+                if (childrenIds.contains(flowFileId)) {
+                    eventBuilder.removeChildFlowFile(flowFile);
+
+                    if (copy == null) {
+                        copy = eventBuilder.copy();
+                        copy.getChildFlowFileIds().clear();
+                    }
+                    copy.addChildFlowFile(flowFileId);
+                }
+            }
+
+            if (copy != null) {
+                newOwner.forkEventBuilders.put(eventFlowFile, copy);
+            }
+        }
+
+        newOwner.processingStartTime = Math.min(newOwner.processingStartTime, processingStartTime);
+
+        for (final FlowFile flowFile : flowFiles) {
+            final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
+
+            final StandardRepositoryRecord repoRecord = this.records.remove(flowFile);
+            newOwner.records.put(flowFileRecord, repoRecord);
+
+            // Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
+            // We do not have to worry about accounting for 'input counts' on connections because those
+            // are incremented only during a checkpoint, and anything that's been checkpointed has
+            // also been committed above.
+            final FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
+            if (inputQueue != null) {
+                final String connectionId = inputQueue.getIdentifier();
+                incrementConnectionOutputCounts(connectionId, -1, -repoRecord.getOriginal().getSize());
+                newOwner.incrementConnectionOutputCounts(connectionId, 1, repoRecord.getOriginal().getSize());
+
+                unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
+                newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new HashSet<>()).add(flowFileRecord);
+
+                flowFilesIn--;
+                contentSizeIn -= flowFile.getSize();
+
+                newOwner.flowFilesIn++;
+                newOwner.contentSizeIn += flowFile.getSize();
+            }
+
+            final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
+            if (removedFlowFiles.remove(flowFileId)) {
+                newOwner.removedFlowFiles.add(flowFileId);
+                newOwner.removedCount++;
+                newOwner.removedBytes += flowFile.getSize();
+
+                removedCount--;
+                removedBytes -= flowFile.getSize();
+            }
+
+            if (createdFlowFiles.remove(flowFileId)) {
+                newOwner.createdFlowFiles.add(flowFileId);
+            }
+
+            if (repoRecord.getTransferRelationship() != null) {
+                flowFilesOut--;
+                contentSizeOut -= flowFile.getSize();
+
+                newOwner.flowFilesOut++;
+                newOwner.contentSizeOut += flowFile.getSize();
+            }
+
+            final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
+            if (events != null) {
+                newOwner.generatedProvenanceEvents.put(flowFile, events);
+            }
+
+            final ContentClaim currentClaim = repoRecord.getCurrentClaim();
+            if (currentClaim != null) {
+                final ByteCountingOutputStream appendableStream = appendableStreams.remove(currentClaim);
+                if (appendableStream != null) {
+                    newOwner.appendableStreams.put(currentClaim, appendableStream);
+                }
+            }
+
+            final Path toDelete = deleteOnCommit.remove(flowFile);
+            if (toDelete != null) {
+                newOwner.deleteOnCommit.put(flowFile, toDelete);
+            }
+        }
+
+        provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
+    }
+
+
     private String summarizeEvents(final Checkpoint checkpoint) {
         final Map<Relationship, Set<String>> transferMap = new HashMap<>(); // relationship to flowfile ID's
         final Set<String> modifiedFlowFileIds = new HashSet<>();
@@ -1190,23 +1358,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void incrementConnectionInputCounts(final Connection connection, final RepositoryRecord record) {
-        StandardFlowFileEvent connectionEvent = connectionCounts.get(connection);
-        if (connectionEvent == null) {
-            connectionEvent = new StandardFlowFileEvent(connection.getIdentifier());
-            connectionCounts.put(connection, connectionEvent);
-        }
-        connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + record.getCurrent().getSize());
-        connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + 1);
+        incrementConnectionInputCounts(connection.getIdentifier(), 1, record.getCurrent().getSize());
+    }
+
+    private void incrementConnectionInputCounts(final String connectionId, final int flowFileCount, final long bytes) {
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
+        connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
     }
 
     private void incrementConnectionOutputCounts(final Connection connection, final FlowFileRecord record) {
-        StandardFlowFileEvent connectionEvent = connectionCounts.get(connection);
-        if (connectionEvent == null) {
-            connectionEvent = new StandardFlowFileEvent(connection.getIdentifier());
-            connectionCounts.put(connection, connectionEvent);
-        }
-        connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + record.getSize());
-        connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + 1);
+        incrementConnectionOutputCounts(connection.getIdentifier(), 1, record.getSize());
+    }
+
+    private void incrementConnectionOutputCounts(final String connectionId, final int flowFileCount, final long bytes) {
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
+        connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
     }
 
     private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) {
@@ -1215,10 +1383,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         flowFilesIn++;
         contentSizeIn += flowFile.getSize();
 
-        Set<FlowFileRecord> set = unacknowledgedFlowFiles.get(connection);
+        Set<FlowFileRecord> set = unacknowledgedFlowFiles.get(connection.getFlowFileQueue());
         if (set == null) {
             set = new HashSet<>();
-            unacknowledgedFlowFiles.put(connection, set);
+            unacknowledgedFlowFiles.put(connection.getFlowFileQueue(), set);
         }
         set.add(flowFile);
 
@@ -1991,7 +2159,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             @Override
             public void close() throws IOException {
                 ffais.close();
-                openInputStreams.remove(this);
+                openInputStreams.remove(source);
             }
 
             @Override
@@ -2025,7 +2193,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         };
 
-        openInputStreams.add(errorHandlingStream);
+        openInputStreams.put(source, errorHandlingStream);
         return errorHandlingStream;
     }
 
@@ -2477,9 +2645,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
             .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
+
         if (!keepSourceFile) {
-            deleteOnCommit.add(source);
+            deleteOnCommit.put(newFile, source);
         }
+
         return newFile;
     }
 
@@ -2806,11 +2976,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         private final Set<ProvenanceEventRecord> reportedEvents = new LinkedHashSet<>();
 
         private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
-        private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
-        private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
+        private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
+        private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
         private final Map<String, Long> counters = new HashMap<>();
 
-        private final Set<Path> deleteOnCommit = new HashSet<>();
+        private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
         private final Set<String> removedFlowFiles = new HashSet<>();
         private final Set<String> createdFlowFiles = new HashSet<>();
 
@@ -2836,7 +3006,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
             this.counters.putAll(session.counters);
 
-            this.deleteOnCommit.addAll(session.deleteOnCommit);
+            this.deleteOnCommit.putAll(session.deleteOnCommit);
             this.removedFlowFiles.addAll(session.removedFlowFiles);
             this.createdFlowFiles.addAll(session.createdFlowFiles);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 8a89dbf..2b60013 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -69,6 +69,18 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
         events.clear();
     }
 
+    void migrate(final StandardProvenanceReporter newOwner, final Collection<String> flowFileIds) {
+        final Set<ProvenanceEventRecord> toMove = new LinkedHashSet<>();
+        for (final ProvenanceEventRecord event : events) {
+            if (flowFileIds.contains(event.getFlowFileUuid())) {
+                toMove.add(event);
+            }
+        }
+
+        events.removeAll(toMove);
+        newOwner.events.addAll(toMove);
+    }
+
     /**
      * Generates a Fork event for the given child and parents but does not register the event. This is useful so that a ProcessSession has the ability to de-dupe events, since one or more events may
      * be created by the session itself, as well as by the Processor

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 7bb9035..83906e2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -64,8 +64,10 @@ public class StandardProcessContext implements ProcessContext, ControllerService
                 value = desc.getDefaultValue();
             }
 
-            final PreparedQuery pq = Query.prepare(value);
-            preparedQueries.put(desc, pq);
+            if (value != null) {
+                final PreparedQuery pq = Query.prepare(value);
+                preparedQueries.put(desc, pq);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index a286024..8cc088d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -39,6 +39,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1397,6 +1398,37 @@ public class TestStandardProcessSession {
         }
     }
 
+    @Test
+    public void testMigrateWithAppendableStream() throws IOException {
+        FlowFile flowFile = session.create();
+        flowFile = session.append(flowFile, out -> out.write("1".getBytes()));
+        flowFile = session.append(flowFile, out -> out.write("2".getBytes()));
+
+        final StandardProcessSession newSession = new StandardProcessSession(context);
+
+        assertTrue(session.isFlowFileKnown(flowFile));
+        assertFalse(newSession.isFlowFileKnown(flowFile));
+
+        session.migrate(newSession, Collections.singleton(flowFile));
+
+        assertFalse(session.isFlowFileKnown(flowFile));
+        assertTrue(newSession.isFlowFileKnown(flowFile));
+
+        flowFile = newSession.append(flowFile, out -> out.write("3".getBytes()));
+
+        final byte[] buff = new byte[3];
+        try (final InputStream in = newSession.read(flowFile)) {
+            StreamUtils.fillBuffer(in, buff, true);
+            assertEquals(-1, in.read());
+        }
+
+        assertTrue(Arrays.equals(new byte[] {'1', '2', '3'}, buff));
+
+        newSession.remove(flowFile);
+        newSession.commit();
+        session.commit();
+    }
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private boolean failOnUpdate = false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index e53ddc9..edb33dc 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -379,7 +379,7 @@ public class PutHiveStreaming extends AbstractProcessor {
         final AtomicInteger successfulRecordCount = new AtomicInteger(0);
         List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
         final FlowFile inputFlowFile = flowFile;
-        final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false);
+        final AtomicBoolean processingFailure = new AtomicBoolean(false);
 
         // Create output flow files and their Avro writers
         AtomicReference<FlowFile> successFlowFile = new AtomicReference<>(session.create(inputFlowFile));
@@ -543,12 +543,10 @@ public class PutHiveStreaming extends AbstractProcessor {
                 } catch (IOException ioe) {
                     // The Avro file is invalid (or may not be an Avro file at all), send it to failure
                     log.error("The incoming flow file can not be read as an Avro file, routing to failure", ioe);
-                    session.transfer(inputFlowFile, REL_FAILURE);
-                    incomingFlowFileTransferred.set(true);
+                    processingFailure.set(true);
                 }
             });
 
-
             if (recordCount.get() > 0) {
                 if (successfulRecordCount.get() > 0) {
                     // Transfer the flow file with successful records
@@ -578,7 +576,9 @@ public class PutHiveStreaming extends AbstractProcessor {
             failureFlowFile.set(null);
 
             // If we got here, we've processed the outgoing flow files correctly, so remove the incoming one if necessary
-            if (!incomingFlowFileTransferred.get()) {
+            if (processingFailure.get()) {
+                session.transfer(inputFlowFile, REL_FAILURE);
+            } else {
                 session.remove(flowFile);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c441a869/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index f32c301..8cd1a74 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -69,7 +69,6 @@ public class TestPutHiveStreaming {
     private MockPutHiveStreaming processor;
 
     private KerberosProperties kerberosPropsWithFile;
-    private KerberosProperties kerberosPropsWithoutFile;
 
     @Before
     public void setUp() throws Exception {
@@ -81,8 +80,6 @@ public class TestPutHiveStreaming {
 
         kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
 
-        kerberosPropsWithoutFile = new KerberosProperties(null);
-
         processor = new MockPutHiveStreaming();
         processor.setKerberosProperties(kerberosPropsWithFile);
         runner = TestRunners.newTestRunner(processor);