You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/07/12 13:53:20 UTC
[nifi] branch main updated: NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)
This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 996d8faaf4 NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)
996d8faaf4 is described below
commit 996d8faaf48afa97591534c1c2f8dca1aa8c68d5
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jul 8 15:21:44 2022 -0400
NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)
This closes #6187
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../repository/StandardProcessSession.java | 11 +++---
.../repository/StandardProcessSessionIT.java | 44 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 7aefec57e8..309b5da16b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -348,17 +348,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
}
} else {
- final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
final FlowFileRecord currRec = record.getCurrent();
- final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
- builder.removeAttributes(retryAttribute);
-
+ final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
record.setDestination(finalDestination.getFlowFileQueue());
- record.setWorking(builder.build(), false);
incrementConnectionInputCounts(finalDestination, record);
for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
incrementConnectionInputCounts(destination, record);
+
+ final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
+ builder.removeAttributes(retryAttribute);
builder.id(context.getNextFlowFileSequence());
final String newUuid = UUID.randomUUID().toString();
@@ -372,7 +371,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
if (claim != null) {
context.getContentRepository().incrementClaimaintCount(claim);
}
- newRecord.setWorking(clone, Collections.<String, String>emptyMap(), false);
+ newRecord.setWorking(clone, Collections.emptyMap(), false);
newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 81bbc7f003..a4e6b399b7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -100,6 +100,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -418,6 +419,49 @@ public class StandardProcessSessionIT {
assertEquals(childUuids, new HashSet<>(fork.getChildUuids()));
}
+ @Test
+ public void testCloneOnMultipleDestinations() {
+ final String originalUuid = "12345678-1234-1234-1234-123456789012";
+ final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", originalUuid)
+ .addAttribute("abc", "xyz")
+ .entryDate(System.currentTimeMillis());
+
+ flowFileQueue.put(flowFileRecordBuilder.build());
+
+ FlowFile flowFile = session.get();
+ assertNotNull(flowFile);
+
+ final List<Connection> connectionList = new ArrayList<>();
+ for (int i=0; i < 3; i++) {
+ connectionList.add(createConnection());
+ }
+
+ when(connectable.getConnections(any(Relationship.class))).thenReturn(new HashSet<>(connectionList));
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ session.commit();
+
+ final List<FlowFileRecord> outputFlowFiles = new ArrayList<>();
+ for (final Connection connection : connectionList) {
+ final FlowFileRecord outputFlowFile = connection.getFlowFileQueue().poll(Collections.emptySet());
+ outputFlowFiles.add(outputFlowFile);
+ }
+
+ assertEquals(3, outputFlowFiles.size());
+
+ final Set<String> uuids = outputFlowFiles.stream()
+ .map(ff -> ff.getAttribute("uuid"))
+ .collect(Collectors.toSet());
+
+ assertEquals(3, uuids.size());
+ assertTrue(uuids.contains(originalUuid));
+
+ final Predicate<FlowFileRecord> attributeAbcMatches = ff -> ff.getAttribute("abc").equals("xyz");
+ assertTrue(outputFlowFiles.stream().allMatch(attributeAbcMatches));
+ }
+
@Test
public void testCheckpointOnSessionDoesNotInteractWithFlowFile() {
final Relationship relationship = new Relationship.Builder().name("A").build();