You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/11 08:48:12 UTC
[nifi] branch main updated: NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 72e54f4 NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless
72e54f4 is described below
commit 72e54f4fab9e8b49a85825f0a85f466e7e2564df
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Jan 4 14:27:22 2022 -0500
NIFI-9390: Updates to MergeContent / MergeRecord so that they play nicely within Stateless
NIFI-9390: Addressed underlying condition in stateless framework that caused Merge-related processors and similar to not properly be triggered as necessary. Added several system tests to verify different configurations.
NIFI-9390: Simplified the logic for how to iterate over the components in a Stateless flow that are ready to be triggered
This closes #5634.
Co-authored-by: Peter Turcsanyi <tu...@apache.org>
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../apache/nifi/processor/util/bin/BinFiles.java | 6 +-
.../nifi/processors/standard/MergeRecord.java | 18 ++-
.../standard/merge/RecordBinManager.java | 4 +
.../nifi/processors/standard/TestMergeContent.java | 22 ++--
.../nifi/processors/standard/TestMergeRecord.java | 8 +-
.../controller/reporting/LogComponentStatuses.java | 2 +-
.../engine/StandardExecutionProgress.java | 2 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 1 -
.../flow/StandardStatelessFlowCurrent.java | 68 +++++++----
.../nifi/stateless/flow/StatelessFlowCurrent.java | 8 ++
.../session/AsynchronousCommitTracker.java | 57 ++++++++-
.../session/TestAsynchronousCommitTracker.java | 9 +-
.../apache/nifi/stateless/basics/MergingIT.java | 2 +-
.../basics/RequiresAdditionalInputIT.java | 133 ++++++++++++++++++++-
.../nifi-system-test-extensions/pom.xml | 5 +
.../tests/system/ConcatenateFlowFiles.java | 2 +
.../tests/system/ConcatenateRangeOfFlowFiles.java | 97 +++++++++++++++
...oncatenateFlowFiles.java => UpdateContent.java} | 71 ++++++-----
.../services/org.apache.nifi.processor.Processor | 2 +
19 files changed, 421 insertions(+), 96 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index d872d7d..df8d4cf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -188,7 +188,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return;
}
- final int binsMigrated = migrateBins(context);
+ final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
final int binsProcessed = processBins(context);
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
@@ -196,9 +196,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
}
- private int migrateBins(final ProcessContext context) {
+ private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint) {
int added = 0;
- for (final Bin bin : binManager.removeReadyBins(true)) {
+ for (final Bin bin : binManager.removeReadyBins(relaxFullnessConstraint)) {
this.readyBins.add(bin);
added++;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 982303f..ce38273 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -323,12 +323,15 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
}
}
+ boolean flowFilePolled = false;
while (isScheduled()) {
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
if (flowFiles.isEmpty()) {
break;
}
+
+ flowFilePolled = true;
if (getLogger().isDebugEnabled()) {
final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
@@ -373,15 +376,20 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
- // Complete any bins that meet their minimum size requirements
try {
- manager.completeFullEnoughBins();
+ if (flowFilePolled) {
+ // At least one new FlowFile was pulled in. Only complete the bins that are entirely full
+ manager.completeFullBins();
+ } else {
+ // No FlowFiles available. Complete any bins that meet their minimum size requirements
+ manager.completeFullEnoughBins();
+
+ getLogger().debug("No more FlowFiles to bin; will yield");
+ context.yield();
+ }
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
-
- getLogger().debug("No more FlowFiles to bin; will yield");
- context.yield();
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index a84e724..ac134e1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -245,6 +245,10 @@ public class RecordBinManager {
return handleCompletedBins(RecordBin::isFullEnough, "Bin is full enough");
}
+ public int completeFullBins() throws IOException {
+ return handleCompletedBins(RecordBin::isFull, "Bin is completely full");
+ }
+
private int handleCompletedBins(final Predicate<RecordBin> completionTest, final String completionReason) throws IOException {
final Map<String, List<RecordBin>> completedBinMap = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 7a20e6d..54a97f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -485,7 +485,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -531,7 +531,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.FOOTER, "$");
createFlowFiles(runner);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -544,7 +544,7 @@ public class TestMergeContent {
}
@Test
- public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException {
+ public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
@@ -552,7 +552,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.HEADER, "@");
createFlowFiles(runner);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -584,7 +584,7 @@ public class TestMergeContent {
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -654,7 +654,7 @@ public class TestMergeContent {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
runner.enqueue(new byte[0], attributes);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -733,7 +733,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
createFlowFiles(runner);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -770,7 +770,7 @@ public class TestMergeContent {
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -793,7 +793,7 @@ public class TestMergeContent {
runner.enqueue(", ".getBytes("UTF-8"), attributes);
attributes.put(CoreAttributes.FILENAME.key(), "AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName");
runner.enqueue("World!".getBytes("UTF-8"), attributes);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
@@ -1108,7 +1108,7 @@ public class TestMergeContent {
attributes.put("attr", "b");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
- runner.run(1);
+ runner.run(2);
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
@@ -1231,7 +1231,7 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
createFlowFiles(runner);
- runner.run();
+ runner.run(2);
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index ea3ace6..2749667 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -81,7 +81,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
- runner.run(1);
+ runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
@@ -110,6 +110,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJane, 34");
runner.enqueue("Name, Color\nJohn, Blue");
+ runner.run(1, false, false);
runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
@@ -339,7 +340,7 @@ public class TestMergeRecord {
}
runner.enqueue(sb.toString());
- runner.run();
+ runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
@@ -377,7 +378,7 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.enqueue("Name, Age\nJohn, 35");
- runner.run();
+ runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
@@ -511,6 +512,7 @@ public class TestMergeRecord {
runner.run(1, false);
Thread.sleep(50L);
+ runner.run(1, false, false);
runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
index 59155dc..220bbc5 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
@@ -38,7 +38,7 @@ public class LogComponentStatuses implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
- private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
+ private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %7$14.14s | %8$28.28s |\n";
private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
private final FlowFileEventRepository flowFileEventRepository;
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
index 4502be2..544689c 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
@@ -109,7 +109,7 @@ public class StandardExecutionProgress implements ExecutionProgress {
@Override
public boolean isDataQueued() {
for (final FlowFileQueue queue : internalFlowFileQueues) {
- if (!queue.isActiveQueueEmpty()) {
+ if (!queue.isActiveQueueEmpty() || queue.isUnacknowledgedFlowFile()) {
return true;
}
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index a64aa57..a107a6a 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -139,7 +139,6 @@ public class StandardStatelessFlow implements StatelessDataflow {
internalFlowFileQueues = discoverInternalFlowFileQueues(rootGroup);
}
-
private List<FlowFileQueue> discoverInternalFlowFileQueues(final ProcessGroup group) {
final Set<Port> rootGroupInputPorts = rootGroup.getInputPorts();
final Set<Port> rootGroupOutputPorts = rootGroup.getOutputPorts();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 099f9f8..2767653 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.processor.ProcessContext;
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -57,10 +57,6 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
this.processContextFactory = builder.processContextFactory;
}
- public Connectable getCurrentComponent() {
- return currentComponent;
- }
-
@Override
public void triggerFlow() {
try {
@@ -68,30 +64,31 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
while (!completionReached) {
triggerRootConnectables();
- NextConnectable nextConnectable = NextConnectable.NEXT_READY;
- while (tracker.isAnyReady() && nextConnectable == NextConnectable.NEXT_READY) {
- final List<Connectable> next = tracker.getReady();
- logger.debug("The following {} components are ready to be triggered: {}", next.size(), next);
+ while (tracker.isAnyReady()) {
+ final Connectable connectable = tracker.getNextReady();
+ logger.debug("The next ready component to be triggered: {}", connectable);
- for (final Connectable connectable : next) {
- nextConnectable = triggerWhileReady(connectable);
+ // Continually trigger the given component as long as it is ready to be triggered
+ final NextConnectable nextConnectable = triggerWhileReady(connectable);
- // If there's nothing left to do, return
- if (nextConnectable == NextConnectable.NONE) {
- return;
- }
-
- // If next connectable is whatever is ready, just continue loop
- if (nextConnectable == NextConnectable.NEXT_READY) {
- continue;
- }
+ // If there's nothing left to do, return
+ if (nextConnectable == NextConnectable.NONE) {
+ return;
+ }
- // Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
- break;
+ // If next connectable is whatever is ready, just continue loop
+ if (nextConnectable == NextConnectable.NEXT_READY) {
+ continue;
}
+
+ // Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
+ break;
}
- completionReached = !tracker.isAnyReady();
+
+ // We have reached completion if the tracker does not know of any components ready to be triggered AND
+ // we have no data queued in the flow (with the exception of Output Ports).
+ completionReached = !tracker.isAnyReady() && isFlowQueueEmpty();
}
} catch (final Throwable t) {
if (t instanceof TerminatedTaskException) {
@@ -106,6 +103,29 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
}
}
+ /**
+ * Returns <code>true</code> if all data in the flow has been fully processed. This includes both 'internal queues'
+ * that are available via the executionProgress, as well as considering any data that has been consumed from the queues by
+ * the 'rootConnectables' that has not yet completed processing
+ *
+ * @return <code>true</code> if all FlowFiles have completed processing and no data is available, <code>false</code> otherwise
+ */
+ private boolean isFlowQueueEmpty() {
+ if (executionProgress.isDataQueued()) {
+ return false;
+ }
+
+ for (final Connectable rootConnectable : rootConnectables) {
+ for (final Connection connection : rootConnectable.getIncomingConnections()) {
+ if (connection.getFlowFileQueue().isUnacknowledgedFlowFile()) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
private void triggerRootConnectables() {
for (final Connectable connectable : rootConnectables) {
currentComponent = connectable;
@@ -202,7 +222,7 @@ public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
SOURCE_CONNECTABLE,
- NONE;
+ NONE
}
public static class Builder {
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
index 9accf7c..aaf2412 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StatelessFlowCurrent.java
@@ -17,6 +17,14 @@
package org.apache.nifi.stateless.flow;
+/**
+ * The StatelessFlowCurrent is responsible for facilitating the flow of data. Like a current of air, water, etc.
+ * is the body that moves, the StatelessFlowCurrent is the piece of the Stateless framework that deals with movement of data
+ */
public interface StatelessFlowCurrent {
+ /**
+ * Triggers the dataflow, starting from 'root' or 'source' components all the way through the end of the dataflow until either
+ * the source components provide no data or all data that is provided is processed.
+ */
void triggerFlow();
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
index 1cfe659..15a5b4d 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
@@ -63,19 +63,44 @@ public class AsynchronousCommitTracker {
}
}
+
+ public Connectable getNextReady() {
+ if (ready.isEmpty()) {
+ return null;
+ }
+
+ Connectable last = null;
+ for (final Connectable connectable : ready) {
+ last = connectable;
+ }
+
+ return last;
+ }
+
public List<Connectable> getReady() {
- final List<Connectable> connectables = new ArrayList<>(ready);
- Collections.reverse(connectables);
- return connectables;
+ final List<Connectable> reversed = new ArrayList<>(ready);
+ Collections.reverse(reversed);
+ return reversed;
}
+ /**
+ * Determines if there are any components that may be ready to be triggered. Note that a value of <code>true</code> may be returned, even if there are no components
+ * that currently are ready according to {@link #isReady(Connectable)}.
+ *
+ * @return <code>true</code> if any component is expected to be ready to trigger, <code>false</code> otherwise
+ */
public boolean isAnyReady() {
final boolean anyReady = !ready.isEmpty();
-
logger.debug("{} Any components ready = {}, list={}", this, anyReady, ready);
return anyReady;
}
+ /**
+ * Checks if the given component is ready to be triggered and if not removes the component from the internal list of ready components
+ *
+ * @param connectable the components to check
+ * @return <code>true</code> if the component is ready to be triggered, <code>false</code> otherwise
+ */
public boolean isReady(final Connectable connectable) {
if (!ready.contains(connectable)) {
logger.debug("{} {} is not ready because it's not in the list of ready components", this, connectable);
@@ -94,7 +119,12 @@ public class AsynchronousCommitTracker {
return true;
}
- logger.debug("{} {} is not ready because it has no data queued", this, connectable);
+ if (connectable.isTriggerWhenEmpty() && isDataHeld(connectable)) {
+ logger.debug("{} {} is ready because it is triggered when its input queue is empty and has unacknowledged data", this, connectable);
+ return true;
+ }
+
+ logger.debug("{} {} is not ready because it has no data queued or held (or has no data queued and is not to be triggered when input queue is empty)", this, connectable);
ready.remove(connectable);
return false;
}
@@ -111,7 +141,22 @@ public class AsynchronousCommitTracker {
private boolean isDataQueued(final Connectable connectable) {
for (final Connection incoming : connectable.getIncomingConnections()) {
- if (!incoming.getFlowFileQueue().isEmpty()) {
+ if (!incoming.getFlowFileQueue().isActiveQueueEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Determines if data is currently being held by the given connectable (i.e., it has at least one incoming Connection with unacknowledged FlowFiles)
+ * @param connectable the connectable to check
+ * @return <code>true</code> if the Connectable is holding onto data, <code>false</code> otherwise
+ */
+ private boolean isDataHeld(final Connectable connectable) {
+ for (final Connection incoming : connectable.getIncomingConnections()) {
+ if (incoming.getFlowFileQueue().isUnacknowledgedFlowFile()) {
return true;
}
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
index abdca73..78bdbe8 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java
@@ -44,20 +44,25 @@ public class TestAsynchronousCommitTracker {
tracker.addConnectable(connectable1);
assertEquals(Collections.singletonList(connectable1), tracker.getReady());
+ assertEquals(connectable1, tracker.getNextReady());
tracker.addConnectable(connectable2);
assertEquals(Arrays.asList(connectable2, connectable1), tracker.getReady());
+ assertEquals(connectable2, tracker.getNextReady());
tracker.addConnectable(connectable3);
assertEquals(Arrays.asList(connectable3, connectable2, connectable1), tracker.getReady());
+ assertEquals(connectable3, tracker.getNextReady());
// connectable1 should now be moved to the start of the List
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
+ assertEquals(connectable1, tracker.getNextReady());
// Adding connectable1 again should now have effect since it is already first
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
+ assertEquals(connectable1, tracker.getNextReady());
}
@Test
@@ -86,7 +91,7 @@ public class TestAsynchronousCommitTracker {
final Connection connection = Mockito.mock(Connection.class);
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
- Mockito.when(flowFileQueue.isEmpty()).thenReturn(false);
+ Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(false);
Mockito.when(connectable2.getIncomingConnections()).thenReturn(Collections.singletonList(connection));
assertTrue(tracker.isReady(connectable2));
@@ -95,7 +100,7 @@ public class TestAsynchronousCommitTracker {
// If we then indicate that the FlowFileQueue is empty, we should see that Connectable2 is no longer ready and it should be evicted from the collection of ready components.
// This should then also result in isAnyReady() being false.
- Mockito.when(flowFileQueue.isEmpty()).thenReturn(true);
+ Mockito.when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
assertFalse(tracker.isReady(connectable2));
assertFalse(tracker.isAnyReady());
assertEquals(Collections.emptyList(), tracker.getReady());
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
index 25e0e3e..098ca99 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/MergingIT.java
@@ -17,9 +17,9 @@
package org.apache.nifi.stateless.basics;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
index 519b804..4fed602 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/RequiresAdditionalInputIT.java
@@ -17,10 +17,11 @@
package org.apache.nifi.stateless.basics;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
@@ -28,11 +29,15 @@ import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -46,6 +51,130 @@ import static org.junit.Assert.assertTrue;
public class RequiresAdditionalInputIT extends StatelessSystemIT {
@Test
+ public void testMergeAsFirstProcessor() throws IOException, StatelessConfigurationException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+ final VersionedPort inPort = flowBuilder.createInputPort("In");
+ final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+ final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+ merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+ flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
+ flowBuilder.createConnection(merge, outPort, "merged");
+
+ // Startup the dataflow
+ final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+ // Enqueue data and trigger
+ for (int i=1; i <= 3; i++) {
+ dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+ }
+
+ final DataflowTrigger trigger = dataflow.trigger();
+ final TriggerResult result = trigger.getResult();
+ assertTrue(result.isSuccessful());
+
+ final List<FlowFile> flowFiles = result.getOutputFlowFiles("Out");
+ Assert.assertEquals(1, flowFiles.size());
+
+ final FlowFile first = flowFiles.get(0);
+ final String outputContent = new String(result.readContentAsByteArray(first));
+ Assert.assertEquals("123", outputContent);
+
+ result.acknowledge();
+ }
+
+
+ @Test
+ public void testMergeAsFirstProcessorWithoutEnoughData() throws IOException, StatelessConfigurationException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+ final VersionedPort inPort = flowBuilder.createInputPort("In");
+ final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+ final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+ merge.setProperties(Collections.singletonMap("Minimum Number of Entries", "100"));
+ merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+ flowBuilder.createConnection(inPort, merge, Relationship.ANONYMOUS.getName());
+ flowBuilder.createConnection(merge, outPort, "merged");
+
+ // Startup the dataflow
+ final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+ // Enqueue data and trigger
+ for (int i=1; i <= 3; i++) {
+ dataflow.enqueue(String.valueOf(i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+ }
+
+ final DataflowTrigger trigger = dataflow.trigger();
+ final Optional<TriggerResult> resultOption = trigger.getResult(2, TimeUnit.SECONDS);
+
+ // We expect this to timeout
+ assertFalse(resultOption.isPresent());
+
+ trigger.cancel();
+ }
+
+
+ @Test
+ public void testMergeDownstream() throws IOException, StatelessConfigurationException, InterruptedException {
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+ final VersionedPort inPort = flowBuilder.createInputPort("In");
+ final VersionedPort outPort = flowBuilder.createOutputPort("Out");
+
+ final VersionedProcessor firstUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+ final Map<String, String> firstUpdateProperties = new HashMap<>();
+ firstUpdateProperties.put("Content", "\n1");
+ firstUpdateProperties.put("Update Strategy", "Append");
+ firstUpdate.setProperties(firstUpdateProperties);
+
+ final VersionedProcessor secondUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+ final Map<String, String> secondUpdateProperties = new HashMap<>();
+ secondUpdateProperties.put("Content", "\n2");
+ secondUpdateProperties.put("Update Strategy", "Append");
+ secondUpdate.setProperties(secondUpdateProperties);
+
+ final VersionedProcessor thirdUpdate = flowBuilder.createSimpleProcessor("UpdateContent");
+ final Map<String, String> thirdUpdateProperties = new HashMap<>();
+ thirdUpdateProperties.put("Content", "\n3");
+ thirdUpdateProperties.put("Update Strategy", "Append");
+ thirdUpdate.setProperties(thirdUpdateProperties);
+
+ final VersionedProcessor merge = flowBuilder.createSimpleProcessor("ConcatenateRangeOfFlowFiles");
+ merge.setAutoTerminatedRelationships(new HashSet<>(Arrays.asList("original", "failure")));
+
+ flowBuilder.createConnection(inPort, firstUpdate, Relationship.ANONYMOUS.getName());
+ flowBuilder.createConnection(firstUpdate, secondUpdate, "success");
+ flowBuilder.createConnection(secondUpdate, thirdUpdate, "success");
+ flowBuilder.createConnection(thirdUpdate, merge, "success");
+ flowBuilder.createConnection(merge, outPort, "merged");
+
+ // Startup the dataflow
+ final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList(), Collections.emptySet(), createTransactionThresholds(1000));
+
+ // Enqueue data and trigger
+ for (int i=1; i <= 3; i++) {
+ dataflow.enqueue(("hello " + i).getBytes(StandardCharsets.UTF_8), Collections.emptyMap(), "In");
+ }
+
+ final DataflowTrigger trigger = dataflow.trigger();
+ final TriggerResult result = trigger.getResult();
+ assertTrue(result.isSuccessful());
+
+ final List<FlowFile> out = result.getOutputFlowFiles("Out");
+ assertEquals(1, out.size());
+ final byte[] outputContent = result.readContentAsByteArray(out.get(0));
+ final String outputText = new String(outputContent, StandardCharsets.UTF_8);
+
+ final StringBuilder expectedContentBuilder = new StringBuilder();
+ for (int i=1; i <= 3; i++) {
+ expectedContentBuilder.append("hello ").append(i).append("\n1\n2\n3");
+ }
+ final String expectedContent = expectedContentBuilder.toString();
+ assertEquals(expectedContent, outputText);
+ }
+
+ @Test
public void testSourceProcessorsTriggeredAsOftenAsRequired() throws IOException, StatelessConfigurationException, InterruptedException {
// Build the flow
final int flowFileCount = 12;
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
index 8fa0006..242730b 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
@@ -39,6 +39,11 @@
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ <version>1.16.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
index 0420d49..b2ed768 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.tests.system;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
@@ -36,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+@TriggerWhenEmpty
public class ConcatenateFlowFiles extends AbstractProcessor {
static final PropertyDescriptor FLOWFILE_COUNT = new Builder()
.name("FlowFile Count")
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java
new file mode 100644
index 0000000..cefc4e6
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateRangeOfFlowFiles.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@TriggerSerially
+@TriggerWhenEmpty
+public class ConcatenateRangeOfFlowFiles extends BinFiles {
+ public static final Relationship REL_MERGED = new Relationship.Builder()
+ .name("merged")
+ .description("The FlowFile containing the merged content")
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_MERGED);
+ return relationships;
+ }
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(MIN_ENTRIES);
+ descriptors.add(MAX_ENTRIES);
+ descriptors.add(MIN_SIZE);
+ descriptors.add(MAX_SIZE);
+ descriptors.add(MAX_BIN_AGE);
+ descriptors.add(MAX_BIN_COUNT);
+ return descriptors;
+ }
+
+ @Override
+ protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+ return flowFile;
+ }
+
+ @Override
+ protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
+ return null;
+ }
+
+ @Override
+ protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
+
+ }
+
+ @Override
+ protected BinProcessingResult processBin(final Bin bin, final ProcessContext context) throws ProcessException {
+ final ProcessSession session = bin.getSession();
+ final List<FlowFile> flowFiles = bin.getContents();
+ FlowFile merged = session.create(flowFiles);
+ merged = session.merge(flowFiles, merged);
+ session.transfer(merged, REL_MERGED);
+
+ getLogger().info("Concatenated {} FlowFiles into {}", flowFiles.size(), merged);
+
+ final BinProcessingResult binProcessingResult = new BinProcessingResult(true);
+ binProcessingResult.setCommitted(false);
+ return binProcessingResult;
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
similarity index 51%
copy from nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
copy to nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
index 0420d49..2b5da94 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ConcatenateFlowFiles.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
@@ -19,73 +19,72 @@ package org.apache.nifi.processors.tests.system;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.StreamUtils;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class ConcatenateFlowFiles extends AbstractProcessor {
- static final PropertyDescriptor FLOWFILE_COUNT = new Builder()
- .name("FlowFile Count")
- .displayName("FlowFile Count")
- .description("Number of FlowFiles to concatenate together")
+public class UpdateContent extends AbstractProcessor {
+
+ static final PropertyDescriptor CONTENT = new Builder()
+ .name("Content")
+ .displayName("Content")
+ .description("Content to set")
.required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(Validator.VALID)
+ .defaultValue("Default Content")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
-
- static final Relationship ORIGINAL = new Relationship.Builder()
- .name("original")
+ static final PropertyDescriptor UPDATE_STRATEGY = new Builder()
+ .name("Update Strategy")
+ .displayName("Update Strategy")
+ .description("How to update the contents")
+ .required(true)
+ .allowableValues("Replace", "Append")
+ .defaultValue("Replace")
.build();
- static final Relationship MERGED = new Relationship.Builder()
- .name("merged")
+
+ private final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.singletonList(FLOWFILE_COUNT);
+ return Arrays.asList(CONTENT, UPDATE_STRATEGY);
}
@Override
public Set<Relationship> getRelationships() {
- return new HashSet<>(Arrays.asList(ORIGINAL, MERGED));
+ return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final int flowFileCount = context.getProperty(FLOWFILE_COUNT).asInteger();
- final List<FlowFile> flowFiles = session.get(flowFileCount);
- if (flowFiles.size() != flowFileCount) {
- session.rollback();
- context.yield();
- getLogger().debug("Need {} FlowFiles but currently on {} are available. Will not merge.", flowFileCount, flowFiles.size());
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
return;
}
- FlowFile merged = session.create(flowFiles);
- try (final OutputStream out = session.write(merged)) {
- for (final FlowFile input : flowFiles) {
- try (final InputStream in = session.read(input)) {
- StreamUtils.copy(in, out);
- }
- }
- } catch (final Exception e) {
- throw new ProcessException("Failed to merge", e);
+ final String content = context.getProperty(CONTENT).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
+
+ final String strategy = context.getProperty(UPDATE_STRATEGY).getValue();
+ if (strategy.equalsIgnoreCase("Replace")) {
+ session.write(flowFile, out -> out.write(contentBytes));
+ } else {
+ session.append(flowFile, out -> out.write(contentBytes));
}
- session.transfer(merged, MERGED);
- session.transfer(flowFiles, ORIGINAL);
+ session.transfer(flowFile, REL_SUCCESS);
}
-
}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c39c5bf..77143b1 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,6 +16,7 @@
org.apache.nifi.processors.tests.system.CountEvents
org.apache.nifi.processors.tests.system.CountFlowFiles
org.apache.nifi.processors.tests.system.ConcatenateFlowFiles
+org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
@@ -38,6 +39,7 @@ org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.TransferBatch
org.apache.nifi.processors.tests.system.ThrowProcessException
+org.apache.nifi.processors.tests.system.UpdateContent
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile