You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/11 08:48:12 UTC

[nifi] branch main updated: NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 72e54f4  NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless
72e54f4 is described below

commit 72e54f4fab9e8b49a85825f0a85f466e7e2564df
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Jan 4 14:27:22 2022 -0500

    NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless
    
    NIFI-9390: Addressed underlying condition in stateless framework that caused Merge-related processors and similar to not properly be triggered as necessary. Added several system tests to verify different configurations.
    
    NIFI-9390: Simplified the logic for how to iterate over the components in a Stateless flow that are ready to be triggered
    
    This closes #5634.
    
    Co-authored-by: Peter Turcsanyi <tu...@apache.org>
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../apache/nifi/processor/util/bin/BinFiles.java   |   6 +-
 .../nifi/processors/standard/MergeRecord.java      |  18 ++-
 .../standard/merge/RecordBinManager.java           |   4 +
 .../nifi/processors/standard/TestMergeContent.java |  22 ++--
 .../nifi/processors/standard/TestMergeRecord.java  |   8 +-
 .../controller/reporting/LogComponentStatuses.java |   2 +-
 .../engine/StandardExecutionProgress.java          |   2 +-
 .../nifi/stateless/flow/StandardStatelessFlow.java |   1 -
 .../flow/StandardStatelessFlowCurrent.java         |  68 +++++++----
 .../nifi/stateless/flow/StatelessFlowCurrent.java  |   8 ++
 .../session/AsynchronousCommitTracker.java         |  57 ++++++++-
 .../session/TestAsynchronousCommitTracker.java     |   9 +-
 .../apache/nifi/stateless/basics/MergingIT.java    |   2 +-
 .../basics/RequiresAdditionalInputIT.java          | 133 ++++++++++++++++++++-
 .../nifi-system-test-extensions/pom.xml            |   5 +
 .../tests/system/ConcatenateFlowFiles.java         |   2 +
 .../tests/system/ConcatenateRangeOfFlowFiles.java  |  97 +++++++++++++++
 ...oncatenateFlowFiles.java => UpdateContent.java} |  71 ++++++-----
 .../services/org.apache.nifi.processor.Processor   |   2 +
 19 files changed, 421 insertions(+), 96 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index d872d7d..df8d4cf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -188,7 +188,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
             return;
         }
 
-        final int binsMigrated = migrateBins(context);
+        final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
         final int binsProcessed = processBins(context);
         //If we accomplished nothing then let's yield
         if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
@@ -196,9 +196,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         }
     }
 
-    private int migrateBins(final ProcessContext context) {
+    private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint) {
         int added = 0;
-        for (final Bin bin : binManager.removeReadyBins(true)) {
+        for (final Bin bin : binManager.removeReadyBins(relaxFullnessConstraint)) {
             this.readyBins.add(bin);
             added++;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 982303f..ce38273 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -323,12 +323,15 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
             }
         }
 
+        boolean flowFilePolled = false;
         while (isScheduled()) {
             final ProcessSession session = sessionFactory.createSession();
             final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
             if (flowFiles.isEmpty()) {
                 break;
             }
+
+            flowFilePolled = true;
             if (getLogger().isDebugEnabled()) {
                 final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
                 getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
@@ -373,15 +376,20 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
                 getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
             }
 
-            // Complete any bins that meet their minimum size requirements
             try {
-                manager.completeFullEnoughBins();
+                if (flowFilePolled) {
+                    // At least one new FlowFile was pulled in. Only complete the bins that are entirely full
+                    manager.completeFullBins();
+                } else {
+                    // No FlowFiles available. Complete any bins that meet their minimum size requirements
+                    manager.completeFullEnoughBins();
+
+                    getLogger().debug("No more FlowFiles to bin; will yield");
+                    context.yield();
+                }
             } catch (final Exception e) {
                 getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
             }
-
-            getLogger().debug("No more FlowFiles to bin; will yield");
-            context.yield();
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index a84e724..ac134e1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -245,6 +245,10 @@ public class RecordBinManager {
         return handleCompletedBins(RecordBin::isFullEnough, "Bin is full enough");
     }
 
+    public int completeFullBins() throws IOException {
+        return handleCompletedBins(RecordBin::isFull, "Bin is completely full");
+    }
+
     private int handleCompletedBins(final Predicate<RecordBin> completionTest, final String completionReason) throws IOException {
         final Map<String, List<RecordBin>> completedBinMap = new HashMap<>();
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 7a20e6d..54a97f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -485,7 +485,7 @@ public class TestMergeContent {
         runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
 
         createFlowFiles(runner);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -531,7 +531,7 @@ public class TestMergeContent {
         runner.setProperty(MergeContent.FOOTER, "$");
 
         createFlowFiles(runner);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -544,7 +544,7 @@ public class TestMergeContent {
     }
 
     @Test
-    public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException {
+    public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
         runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
         runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
@@ -552,7 +552,7 @@ public class TestMergeContent {
         runner.setProperty(MergeContent.HEADER, "@");
 
         createFlowFiles(runner);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -584,7 +584,7 @@ public class TestMergeContent {
         runner.enqueue("Hello".getBytes("UTF-8"), attributes);
         runner.enqueue(", ".getBytes("UTF-8"), attributes);
         runner.enqueue("World!".getBytes("UTF-8"), attributes);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -654,7 +654,7 @@ public class TestMergeContent {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
         runner.enqueue(new byte[0], attributes);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -733,7 +733,7 @@ public class TestMergeContent {
         runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
 
         createFlowFiles(runner);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -770,7 +770,7 @@ public class TestMergeContent {
         runner.enqueue("Hello".getBytes("UTF-8"), attributes);
         runner.enqueue(", ".getBytes("UTF-8"), attributes);
         runner.enqueue("World!".getBytes("UTF-8"), attributes);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -793,7 +793,7 @@ public class TestMergeContent {
         runner.enqueue(", ".getBytes("UTF-8"), attributes);
         attributes.put(CoreAttributes.FILENAME.key(), "AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName");
         runner.enqueue("World!".getBytes("UTF-8"), attributes);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -1108,7 +1108,7 @@ public class TestMergeContent {
         attributes.put("attr", "b");
         runner.enqueue("Panama".getBytes("UTF-8"), attributes);
 
-        runner.run(1);
+        runner.run(2);
 
         runner.assertTransferCount(MergeContent.REL_MERGED, 2);
 
@@ -1231,7 +1231,7 @@ public class TestMergeContent {
         runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
 
         createFlowFiles(runner);
-        runner.run();
+        runner.run(2);
 
         runner.assertQueueEmpty();
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index ea3ace6..2749667 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -81,7 +81,7 @@ public class TestMergeRecord {
         runner.enqueue("Name, Age\nJohn, 35");
         runner.enqueue("Name, Age\nJane, 34");
 
-        runner.run(1);
+        runner.run(2);
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
 
@@ -110,6 +110,7 @@ public class TestMergeRecord {
         runner.enqueue("Name, Age\nJane, 34");
         runner.enqueue("Name, Color\nJohn, Blue");
 
+        runner.run(1, false, false);
         runner.run(1, true, false);
 
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
@@ -339,7 +340,7 @@ public class TestMergeRecord {
         }
         runner.enqueue(sb.toString());
 
-        runner.run();
+        runner.run(2);
 
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
@@ -377,7 +378,7 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
 
         runner.enqueue("Name, Age\nJohn, 35");
-        runner.run();
+        runner.run(2);
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
     }
@@ -511,6 +512,7 @@ public class TestMergeRecord {
         runner.run(1, false);
 
         Thread.sleep(50L);
+        runner.run(1, false, false);
         runner.run(1, true, false);
 
         runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
index 59155dc..220bbc5 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
@@ -38,7 +38,7 @@ public class LogComponentStatuses implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
     private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
 
-    private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
+    private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %7$14.14s | %8$28.28s |\n";
     private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
 
     private final FlowFileEventRepository flowFileEventRepository;
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
index 4502be2..544689c 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
@@ -109,7 +109,7 @@ public class StandardExecutionProgress implements ExecutionProgress {
     @Override
     public boolean isDataQueued() {
         for (final FlowFileQueue queue : internalFlowFileQueues) {
-            if (!queue.isActiveQueueEmpty()) {
+            if (!queue.isActiveQueueEmpty() || queue.isUnacknowledgedFlowFile()) {
                 return true;
             }
         }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index a64aa57..a107a6a 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -139,7 +139,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
         internalFlowFileQueues = discoverInternalFlowFileQueues(rootGroup);
     }
 
-
     private List<FlowFileQueue> discoverInternalFlowFileQueues(final ProcessGroup group) {
         final Set<Port> rootGroupInputPorts = rootGroup.getInputPorts();
         final Set<Port> rootGroupOutputPorts = rootGroup.getOutputPorts();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 099f9f8..2767653 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.flow;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.processor.ProcessContext;
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
@@ -57,10 +57,6 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
         this.processContextFactory = builder.processContextFactory;
     }
 
-    public Connectable getCurrentComponent() {
-        return currentComponent;
-    }
-
     @Override
     public void triggerFlow() {
         try {
@@ -68,30 +64,31 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
             while (!completionReached) {
                 triggerRootConnectables();
 
-                NextConnectable nextConnectable = NextConnectable.NEXT_READY;
-                while (tracker.isAnyReady() && nextConnectable == NextConnectable.NEXT_READY) {
-                    final List<Connectable> next = tracker.getReady();
-                    logger.debug("The following {} components are ready to be triggered: {}", next.size(), next);
+                while (tracker.isAnyReady()) {
+                    final Connectable connectable = tracker.getNextReady();
+                    logger.debug("The next ready component to be triggered: {}", connectable);
 
-                    for (final Connectable connectable : next) {
-                        nextConnectable = triggerWhileReady(connectable);
+                    // Continually trigger the given component as long as it is ready to be triggered
+                    final NextConnectable nextConnectable = triggerWhileReady(connectable);
 
-                        // If there's nothing left to do, return
-                        if (nextConnectable == NextConnectable.NONE) {
-                            return;
-                        }
-
-                        // If next connectable is whatever is ready, just continue loop
-                        if (nextConnectable == NextConnectable.NEXT_READY) {
-                            continue;
-                        }
+                    // If there's nothing left to do, return
+                    if (nextConnectable == NextConnectable.NONE) {
+                        return;
+                    }
 
-                        // Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
-                        break;
+                    // If next connectable is whatever is ready, just continue loop
+                    if (nextConnectable == NextConnectable.NEXT_READY) {
+                        continue;
                     }
+
+                    // Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
+                    break;
                 }
 
-                completionReached = !tracker.isAnyReady();
+
+                // We have reached completion if the tracker does not know of any components ready to be triggered AND
+                // we have no data queued in the flow (with the exception of Output Ports).
+                completionReached = !tracker.isAnyReady() && isFlowQueueEmpty();
             }
         } catch (final Throwable t) {
             if (t instanceof TerminatedTaskException) {
@@ -106,6 +103,29 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
         }
     }
 
+    /**
+     * Returns <code>true</code> if all data in the flow has been fully processed. This includes both 'internal queues'
+     * that are available via the executionProgress, as well as considering any data that has been consumed from the queues by
+     * the 'rootConnectables' that has not yet completed processing
+     *
+     * @return <code>true</code> if all FlowFiles have completed processing and no data is available, <code>false</code> otherwise
+     */
+    private boolean isFlowQueueEmpty() {
+        if (executionProgress.isDataQueued()) {
+            return false;
+        }
+
+        for (final Connectable rootConnectable : rootConnectables) {
+            for (final Connection connection : rootConnectable.getIncomingConnections()) {
+                if (connection.getFlowFileQueue().isUnacknowledgedFlowFile()) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
     private void triggerRootConnectables() {
         for (final Connectable connectable : rootConnectables) {
             currentComponent = connectable;
@@ -202,7 +222,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
 
         SOURCE_CONNECTABLE,
 
-        NONE;
+        NONE
     }
 
     public static class Builder {
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
index 9accf7c..aaf2412 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
@@ -17,6 +17,14 @@
 
 package org.apache.nifi.stateless.flow;
 
+/**
+ * The StatelessFlowCurrent is responsible for facilitating the flow of data. Like a current of air, water, etc.
+ * is the body that moves, the StatelessFlowCurrent is the piece of the Stateless framework that deals with movement of data
+ */
 public interface StatelessFlowCurrent {
+    /**
+     * Triggers the dataflow, starting from 'root' or 'source' components all the way through the end of the dataflow until either
+     * the source components provide no data or all data that is provided is processed.
+     */
     void triggerFlow();
 }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
index 1cfe659..15a5b4d 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
@@ -63,19 +63,44 @@ public class AsynchronousCommitTracker {
         }
     }
 
+
+    public Connectable getNextReady() {
+        if (ready.isEmpty()) {
+            return null;
+        }
+
+        Connectable last = null;
+        for (final Connectable connectable : ready) {
+            last = connectable;
+        }
+
+        return last;
+    }
+
     public List<Connectable> getReady() {
-        final List<Connectable> connectables = new ArrayList<>(ready);
-        Collections.reverse(connectables);
-        return connectables;
+        final List<Connectable> reversed = new ArrayList<>(ready);
+        Collections.reverse(reversed);
+        return reversed;
     }
 
+    /**
+     * Determines if there are any components that may be ready to be triggered. Note that a value of <code>true</code> may be returned, even if there are no components
+     * that currently are ready according to {@link #isReady(Connectable)}.
+     *
+     * @return <code>true</code> if any component is expected to be ready to trigger, <code>false</code> otherwise
+     */
     public boolean isAnyReady() {
         final boolean anyReady = !ready.isEmpty();
-
         logger.debug("{} Any components ready = {}, list={}", this, anyReady, ready);
         return anyReady;
     }
 
+    /**
+     * Checks if the given component is ready to be triggered and if not removes the component from the internal list of ready components
+     *
+     * @param connectable the components to check
+     * @return <code>true</code> if the component is ready to be triggered, <code>false</code> otherwise
+     */
     public boolean isReady(final Connectable connectable) {
         if (!ready.contains(connectable)) {
             logger.debug("{} {} is not ready because it's not in the list of ready components", this, connectable);
@@ -94,7 +119,12 @@ public class AsynchronousCommitTracker {
             return true;
         }
 
-        logger.debug("{} {} is not ready because it has no data queued", this, connectable);
+        if (connectable.isTriggerWhenEmpty() && isDataHeld(connectable)) {
+            logger.debug("{} {} is ready because it is triggered when its input queue is empty and has unacknowledged data", this, connectable);
+            return true;
+        }
+
+        logger.debug("{} {} is not ready because it has no data queued or held (or has no data queued and is not to be triggered when input queue is empty)", this, connectable);
         ready.remove(connectable);
         return false;
     }
@@ -111,7 +141,22 @@ public class AsynchronousCommitTracker {
 
     private boolean isDataQueued(final Connectable connectable) {
         for (final Connection incoming : connectable.getIncomingConnections()) {
-            if (!incoming.getFlowFileQueue().isEmpty()) {
+            if (!incoming.getFlowFileQueue().isActiveQueueEmpty()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Determines if data is currently being held by the given connectable (i.e., it has at least one incoming Connection with unacknowledged FlowFiles)
+     * @param connectable the connectable to check
+     * @return <code>true</code> if the Connectable is holding onto data, <code>false</code> otherwise
+     */
+    private boolean isDataHeld(final Connectable connectable) {
+        for (final Connection incoming : connectable.getIncomingConnections()) {
+            if (incoming.getFlowFileQueue().isUnacknowledgedFlowFile()) {
                 return true;
             }
         }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
index abdca73..78bdbe8 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
@@ -44,20 +44,25 @@ public class TestAsynchronousCommitTracker {
 
         tracker.addConnectable(connectable1);
         assertEquals(Collections.singletonList(connectable1), tracker.getReady());
+        assertEquals(connectable1, tracker.getNextReady());
 
         tracker.addConnectable(connectable2);
         assertEquals(Arrays.asList(connectable2, connectable1), tracker.getReady());
+        assertEquals(connectable2, tracker.getNextReady());
 
         tracker.addConnectable(connectable3);
         assertEquals(Arrays.asList(connectable3, connectable2, connectable1), tracker.getReady());
+        assertEquals(connectable3, tracker.getNextReady());
 
         // connectable1 should now be moved to the start of the List
         tracker.addConnectable(connectable1);
         assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
+        assertEquals(connectable1, tracker.getNextReady());
 
         // Adding connectable1 again should now have effect since it is already first
         tracker.addConnectable(connectable1);
         assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
+        assertEquals(connectable1, tracker.getNextReady());
     }
 
     @Test
@@ -86,7 +91,7 @@ public class TestAsynchronousCommitTracker {
         final Connection connection = Mockito.mock(Connection.class);
         final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
         Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
-        Mockito.when(flowFileQueue.isEmpty()).thenReturn(false);
+        Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(false);
         Mockito.when(connectable2.getIncomingConnections()).thenReturn(Collections.singletonList(connection));
 
         assertTrue(tracker.isReady(connectable2));
@@ -95,7 +100,7 @@ public class TestAsynchronousCommitTracker {
 
         // If we then indicate that the FlowFileQueue is empty, we should see that Connectable2 is no longer ready and it should be evicted from the collection of ready components.
         // This should then also result in isAnyReady() being false.
-        Mockito.when(flowFileQueue.isEmpty()).thenReturn(true);
+        Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
         assertFalse(tracker.isReady(connectable2));
         assertFalse(tracker.isAnyReady());
         assertEquals(Collections.emptyList(), tracker.getReady());
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
index 25e0e3e..098ca99 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
@@ -17,9 +17,9 @@
 
 package org.apache.nifi.stateless.basics;
 
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.stateless.StatelessSystemIT;
 import org.apache.nifi.stateless.VersionedFlowBuilder;
 import org.apache.nifi.stateless.config.StatelessConfigurationException;
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
index 519b804..4fed602 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
@@ -17,10 +17,11 @@
 
 package org.apache.nifi.stateless.basics;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.stateless.StatelessSystemIT;
 import org.apache.nifi.stateless.VersionedFlowBuilder;
 import org.apache.nifi.stateless.config.StatelessConfigurationException;
@@ -28,11 +29,15 @@ import org.apache.nifi.stateless.flow.DataflowTrigger;
 import org.apache.nifi.stateless.flow.StatelessDataflow;
 import org.apache.nifi.stateless.flow.TransactionThresholds;
 import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -46,6 +51,130 @@ import static org.junit.Assert.assertTrue;
 public class RequiresAdditionalInputIT extends StatelessSystemIT {
 
     @Test
+    public void testMergeAsFirstProcessor() throws IOException, StatelessConfigurationException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+        final VersionedPort inPort = flowBuilder.createInputPort("In");
+        final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+        final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+        merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+        flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
+        flowBuilder.createConnection(merge, outPort, "merged");
+
+        // Startup the dataflow
+        final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+        // Enqueue data and trigger
+        for (int i=1; i <= 3; i++) {
+            dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+        }
+
+        final DataflowTrigger trigger = dataflow.trigger();
+        final TriggerResult result = trigger.getResult();
+        assertTrue(result.isSuccessful());
+
+        final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
+        Assert.assertEquals(1, flowFiles.size());
+
+        final FlowFile first = flowFiles.get(0);
+        final String outputContent = new String(result.readContentAsByteArray(first));
+        Assert.assertEquals("123", outputContent);
+
+        result.acknowledge();
+    }
+
+
+    @Test
+    public void testMergeAsFirstProcessorWithoutEnoughData() throws IOException, StatelessConfigurationException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+        final VersionedPort inPort = flowBuilder.createInputPort("In");
+        final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+        final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+        merge.setProperties(Collections.singletonMap("Minimum Number of Entries", "100"));
+        merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+        flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
+        flowBuilder.createConnection(merge, outPort, "merged");
+
+        // Startup the dataflow
+        final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+        // Enqueue data and trigger
+        for (int i=1; i <= 3; i++) {
+            dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+        }
+
+        final DataflowTrigger trigger = dataflow.trigger();
+        final Optional<TriggerResult> resultOption = trigger.getResult(2, TimeUnit.SECONDS);
+
+        // We expect this to timeout
+        assertFalse(resultOption.isPresent());
+
+        trigger.cancel();
+    }
+
+
+    @Test
+    public void testMergeDownstream() throws IOException, StatelessConfigurationException, InterruptedException {
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+        final VersionedPort inPort = flowBuilder.createInputPort("In");
+        final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+        final VersionedProcessor firstUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+        final Map<String, String> firstUpdateProperties = new HashMap<>();
+        firstUpdateProperties.put("Content", "\n1");
+        firstUpdateProperties.put("Update Strategy", "Append");
+        firstUpdate.setProperties(firstUpdateProperties);
+
+        final VersionedProcessor secondUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+        final Map<String, String> secondUpdateProperties = new HashMap<>();
+        secondUpdateProperties.put("Content", "\n2");
+        secondUpdateProperties.put("Update Strategy", "Append");
+        secondUpdate.setProperties(secondUpdateProperties);
+
+        final VersionedProcessor thirdUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+        final Map<String, String> thirdUpdateProperties = new HashMap<>();
+        thirdUpdateProperties.put("Content", "\n3");
+        thirdUpdateProperties.put("Update Strategy", "Append");
+        thirdUpdate.setProperties(thirdUpdateProperties);
+
+        final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+        merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+        flowBuilder.createConnection(inPort, firstUpdate, Relationship.ANONYMOUS.getName());
+        flowBuilder.createConnection(firstUpdate, secondUpdate, "success");
+        flowBuilder.createConnection(secondUpdate, thirdUpdate, "success");
+        flowBuilder.createConnection(thirdUpdate, merge, "success");
+        flowBuilder.createConnection(merge, outPort, "merged");
+
+        // Startup the dataflow
+        final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+        // Enqueue data and trigger
+        for (int i=1; i <= 3; i++) {
+            dataflow.enqueue(("hello " + i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+        }
+
+        final DataflowTrigger trigger = dataflow.trigger();
+        final TriggerResult result = trigger.getResult();
+        assertTrue(result.isSuccessful());
+
+        final List<FlowFile> out = result.getOutputFlowFiles("Out");
+        assertEquals(1, out.size());
+        final byte[] outputContent = result.readContentAsByteArray(out.get(0));
+        final String outputText = new String(outputContent, StandardCharsets.UTF_8);
+
+        final StringBuilder expectedContentBuilder = new StringBuilder();
+        for (int i=1; i <= 3; i++) {
+            expectedContentBuilder.append("hello ").append(i).append("\n1\n2\n3");
+        }
+        final String expectedContent = expectedContentBuilder.toString();
+        assertEquals(expectedContent, outputText);
+    }
+
+    @Test
     public void testSourceProcessorsTriggeredAsOftenAsRequired() throws IOException, StatelessConfigurationException, InterruptedException {
         // Build the flow
         final int flowFileCount = 12;
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
index 8fa0006..242730b 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
@@ -39,6 +39,11 @@
             <artifactId>nifi-utils</artifactId>
             <version>1.16.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
index 0420d49..b2ed768 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.tests.system;
 
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.flowfile.FlowFile;
@@ -36,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+@TriggerWhenEmpty
 public class ConcatenateFlowFiles extends AbstractProcessor {
     static final PropertyDescriptor FLOWFILE_COUNT = new Builder()
         .name("FlowFile Count")
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java
new file mode 100644
index 0000000..cefc4e6
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@TriggerSerially
+@TriggerWhenEmpty
+public class ConcatenateRangeOfFlowFiles extends BinFiles {
+    public static final Relationship REL_MERGED = new Relationship.Builder()
+        .name("merged")
+        .description("The FlowFile containing the merged content")
+        .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_MERGED);
+        return relationships;
+    }
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
+        return descriptors;
+    }
+
+    @Override
+    protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        return flowFile;
+    }
+
+    @Override
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
+        return null;
+    }
+
+    @Override
+    protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
+
+    }
+
+    @Override
+    protected BinProcessingResult processBin(final Bin bin, final ProcessContext context) throws ProcessException {
+        final ProcessSession session = bin.getSession();
+        final List<FlowFile> flowFiles = bin.getContents();
+        FlowFile merged = session.create(flowFiles);
+        merged = session.merge(flowFiles, merged);
+        session.transfer(merged, REL_MERGED);
+
+        getLogger().info("Concatenated {} FlowFiles into {}", flowFiles.size(), merged);
+
+        final BinProcessingResult binProcessingResult = new BinProcessingResult(true);
+        binProcessingResult.setCommitted(false);
+        return binProcessingResult;
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
similarity index 51%
copy from nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
copy to nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
index 0420d49..2b5da94 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
@@ -19,73 +19,72 @@ package org.apache.nifi.processors.tests.system;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.StreamUtils;
 
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class ConcatenateFlowFiles extends AbstractProcessor {
-    static final PropertyDescriptor FLOWFILE_COUNT = new Builder()
-        .name("FlowFile Count")
-        .displayName("FlowFile Count")
-        .description("Number of FlowFiles to concatenate together")
+public class UpdateContent extends AbstractProcessor {
+
+    static final PropertyDescriptor CONTENT = new Builder()
+        .name("Content")
+        .displayName("Content")
+        .description("Content to set")
         .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .addValidator(Validator.VALID)
+        .defaultValue("Default Content")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
-
-    static final Relationship ORIGINAL = new Relationship.Builder()
-        .name("original")
+    static final PropertyDescriptor UPDATE_STRATEGY = new Builder()
+        .name("Update Strategy")
+        .displayName("Update Strategy")
+        .description("How to update the contents")
+        .required(true)
+        .allowableValues("Replace", "Append")
+        .defaultValue("Replace")
         .build();
-    static final Relationship MERGED = new Relationship.Builder()
-        .name("merged")
+
+    private final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
         .build();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Collections.singletonList(FLOWFILE_COUNT);
+        return Arrays.asList(CONTENT, UPDATE_STRATEGY);
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        return new HashSet<>(Arrays.asList(ORIGINAL, MERGED));
+        return Collections.singleton(REL_SUCCESS);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final int flowFileCount = context.getProperty(FLOWFILE_COUNT).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-        if (flowFiles.size() != flowFileCount) {
-            session.rollback();
-            context.yield();
-            getLogger().debug("Need {} FlowFiles but currently on {} are available. Will not merge.", flowFileCount, flowFiles.size());
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
 
-        FlowFile merged = session.create(flowFiles);
-        try (final OutputStream out = session.write(merged)) {
-            for (final FlowFile input : flowFiles) {
-                try (final InputStream in = session.read(input)) {
-                    StreamUtils.copy(in, out);
-                }
-            }
-        } catch (final Exception e) {
-            throw new ProcessException("Failed to merge", e);
+        final String content = context.getProperty(CONTENT).evaluateAttributeExpressions(flowFile).getValue();
+        final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
+
+        final String strategy = context.getProperty(UPDATE_STRATEGY).getValue();
+        if (strategy.equalsIgnoreCase("Replace")) {
+            session.write(flowFile, out -> out.write(contentBytes));
+        } else {
+            session.append(flowFile, out -> out.write(contentBytes));
         }
 
-        session.transfer(merged, MERGED);
-        session.transfer(flowFiles, ORIGINAL);
+        session.transfer(flowFile, REL_SUCCESS);
     }
-
 }
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c39c5bf..77143b1 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,6 +16,7 @@
 org.apache.nifi.processors.tests.system.CountEvents
 org.apache.nifi.processors.tests.system.CountFlowFiles
 org.apache.nifi.processors.tests.system.ConcatenateFlowFiles
+org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
 org.apache.nifi.processors.tests.system.DependOnProperties
 org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
 org.apache.nifi.processors.tests.system.Duplicate
@@ -38,6 +39,7 @@ org.apache.nifi.processors.tests.system.SplitByLine
 org.apache.nifi.processors.tests.system.TerminateFlowFile
 org.apache.nifi.processors.tests.system.TransferBatch
 org.apache.nifi.processors.tests.system.ThrowProcessException
+org.apache.nifi.processors.tests.system.UpdateContent
 org.apache.nifi.processors.tests.system.ValidateFileExists
 org.apache.nifi.processors.tests.system.VerifyContents
 org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile