You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/10/04 14:50:13 UTC
[nifi] branch support/nifi-1.x updated: [NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 6929ddecb6 [NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827)
6929ddecb6 is described below
commit 6929ddecb62b247451fc12ca7a82ca33dea7628e
Author: Eric Secules <es...@gmail.com>
AuthorDate: Wed Oct 4 07:48:50 2023 -0700
[NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827)
rename method to be more descriptive, fix checkstyle error for trailing whitespace in TestMockProcessSession.java
added session.transfer call to unit tests so that they fail without the fixes.
Co-authored-by: Eric Secules <er...@macrohealth.com>
---
.../org/apache/nifi/util/MockProcessSession.java | 48 ++++++++++++++--------
.../apache/nifi/util/TestMockProcessSession.java | 34 +++++++++++++++
2 files changed, 66 insertions(+), 16 deletions(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index ce0d93094e..c408c44e5b 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -69,6 +69,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
+ private final Set<Long> created = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>();
private final Processor processor;
@@ -213,6 +214,10 @@ public class MockProcessSession implements ProcessSession {
if (removedFlowFiles.remove(flowFile.getId())) {
newOwner.removedFlowFiles.add(flowFile.getId());
}
+
+ if (created.remove(flowFile.getId())) {
+ newOwner.created.add(flowFile.getId());
+ }
}
final Set<String> flowFileIds = flowFiles.stream()
@@ -226,8 +231,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile clone(FlowFile flowFile) {
flowFile = validateState(flowFile);
final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
- currentVersions.put(newFlowFile.getId(), newFlowFile);
- beingProcessed.add(newFlowFile.getId());
+ updateStateWithNewFlowFile(newFlowFile);
return newFlowFile;
}
@@ -242,8 +246,7 @@ public class MockProcessSession implements ProcessSession {
final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size));
newFlowFile.setData(newContent);
- currentVersions.put(newFlowFile.getId(), newFlowFile);
- beingProcessed.add(newFlowFile.getId());
+ updateStateWithNewFlowFile(newFlowFile);
return newFlowFile;
}
@@ -290,6 +293,7 @@ public class MockProcessSession implements ProcessSession {
beingProcessed.clear();
currentVersions.clear();
originalVersions.clear();
+ created.clear();
for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
sharedState.adjustCounter(entry.getKey(), entry.getValue());
@@ -339,8 +343,7 @@ public class MockProcessSession implements ProcessSession {
@Override
public MockFlowFile create() {
final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId());
- currentVersions.put(flowFile.getId(), flowFile);
- beingProcessed.add(flowFile.getId());
+ updateStateWithNewFlowFile(flowFile);
return flowFile;
}
@@ -348,8 +351,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile create(final FlowFile flowFile) {
MockFlowFile newFlowFile = create();
newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
- currentVersions.put(newFlowFile.getId(), newFlowFile);
- beingProcessed.add(newFlowFile.getId());
+ updateStateWithNewFlowFile(newFlowFile);
return newFlowFile;
}
@@ -357,8 +359,7 @@ public class MockProcessSession implements ProcessSession {
public MockFlowFile create(final Collection<FlowFile> flowFiles) {
MockFlowFile newFlowFile = create();
newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
- currentVersions.put(newFlowFile.getId(), newFlowFile);
- beingProcessed.add(newFlowFile.getId());
+ updateStateWithNewFlowFile(newFlowFile);
return newFlowFile;
}
@@ -802,9 +803,11 @@ public class MockProcessSession implements ProcessSession {
for (final List<MockFlowFile> list : transferMap.values()) {
for (final MockFlowFile flowFile : list) {
- processorQueue.offer(flowFile);
- if (penalize) {
- penalized.add(flowFile);
+ if (!created.contains(flowFile.getId())) {
+ processorQueue.offer(flowFile);
+ if (penalize) {
+ penalized.add(flowFile);
+ }
}
}
}
@@ -812,9 +815,11 @@ public class MockProcessSession implements ProcessSession {
for (final Long flowFileId : beingProcessed) {
final MockFlowFile flowFile = originalVersions.get(flowFileId);
if (flowFile != null) {
- processorQueue.offer(flowFile);
- if (penalize) {
- penalized.add(flowFile);
+ if (!created.contains(flowFile.getId())) {
+ processorQueue.offer(flowFile);
+ if (penalize) {
+ penalized.add(flowFile);
+ }
}
}
}
@@ -824,6 +829,7 @@ public class MockProcessSession implements ProcessSession {
currentVersions.clear();
originalVersions.clear();
transferMap.clear();
+ created.clear();
clearTransferState();
if (!penalize) {
penalized.clear();
@@ -858,6 +864,15 @@ public class MockProcessSession implements ProcessSession {
mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
}
+ private void updateStateWithNewFlowFile(MockFlowFile newFlowFile) {
+ if (newFlowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ beingProcessed.add(newFlowFile.getId());
+ created.add(newFlowFile.getId());
+ }
+
@Override
public void transfer(final Collection<FlowFile> flowFiles) {
for (final FlowFile flowFile : flowFiles) {
@@ -1072,6 +1087,7 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
newFlowFile.setData(baos.toByteArray());
currentVersions.put(newFlowFile.getId(), newFlowFile);
+ created.add(newFlowFile.getId());
return newFlowFile;
}
diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 775bc2f5ed..700d0d29b2 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -134,6 +134,40 @@ public class TestMockProcessSession {
assertFalse(ff1.isPenalized());
}
+ @Test
+ public void testRollbackWithCreatedFlowFile() {
+ final Processor processor = new PoorlyBehavedProcessor();
+ final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
+ final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+ session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+ session.rollback();
+ session.assertQueueEmpty();
+ }
+
+ @Test
+ public void testRollbackWithClonedFlowFile() {
+ final Processor processor = new PoorlyBehavedProcessor();
+ final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
+ final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+ session.clone(ff1);
+ session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+ session.rollback();
+ session.assertQueueEmpty();
+ }
+
+ @Test
+ public void testRollbackWithMigratedFlowFile() {
+ final Processor processor = new PoorlyBehavedProcessor();
+ final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
+ final MockProcessSession newSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
+ final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+ session.migrate(newSession);
+ newSession.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+ newSession.rollback();
+ session.assertQueueEmpty();
+ newSession.assertQueueEmpty();
+ }
+
@Test
public void testAttributePreservedAfterWrite() throws IOException {
final Processor processor = new PoorlyBehavedProcessor();