You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/07/12 13:53:20 UTC

[nifi] branch main updated: NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)

This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 996d8faaf4 NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)
996d8faaf4 is described below

commit 996d8faaf48afa97591534c1c2f8dca1aa8c68d5
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jul 8 15:21:44 2022 -0400

    NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)
    
    This closes #6187
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../repository/StandardProcessSession.java         | 11 +++---
 .../repository/StandardProcessSessionIT.java       | 44 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 7aefec57e8..309b5da16b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -348,17 +348,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
                     }
                 }
             } else {
-                final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
                 final FlowFileRecord currRec = record.getCurrent();
-                final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
-                builder.removeAttributes(retryAttribute);
-
+                final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
                 record.setDestination(finalDestination.getFlowFileQueue());
-                record.setWorking(builder.build(), false);
                 incrementConnectionInputCounts(finalDestination, record);
 
                 for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
                     incrementConnectionInputCounts(destination, record);
+
+                    final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
+                    builder.removeAttributes(retryAttribute);
                     builder.id(context.getNextFlowFileSequence());
 
                     final String newUuid = UUID.randomUUID().toString();
@@ -372,7 +371,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
                     if (claim != null) {
                         context.getContentRepository().incrementClaimaintCount(claim);
                     }
-                    newRecord.setWorking(clone, Collections.<String, String>emptyMap(), false);
+                    newRecord.setWorking(clone, Collections.emptyMap(), false);
 
                     newRecord.setDestination(destination.getFlowFileQueue());
                     newRecord.setTransferRelationship(record.getTransferRelationship());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 81bbc7f003..a4e6b399b7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -100,6 +100,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -418,6 +419,49 @@ public class StandardProcessSessionIT {
         assertEquals(childUuids, new HashSet<>(fork.getChildUuids()));
     }
 
+    @Test
+    public void testCloneOnMultipleDestinations() {
+        final String originalUuid = "12345678-1234-1234-1234-123456789012";
+        final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", originalUuid)
+            .addAttribute("abc", "xyz")
+            .entryDate(System.currentTimeMillis());
+
+        flowFileQueue.put(flowFileRecordBuilder.build());
+
+        FlowFile flowFile = session.get();
+        assertNotNull(flowFile);
+
+        final List<Connection> connectionList = new ArrayList<>();
+        for (int i=0; i < 3; i++) {
+            connectionList.add(createConnection());
+        }
+
+        when(connectable.getConnections(any(Relationship.class))).thenReturn(new HashSet<>(connectionList));
+
+        session.transfer(flowFile, Relationship.ANONYMOUS);
+        session.commit();
+
+        final List<FlowFileRecord> outputFlowFiles = new ArrayList<>();
+        for (final Connection connection : connectionList) {
+            final FlowFileRecord outputFlowFile = connection.getFlowFileQueue().poll(Collections.emptySet());
+            outputFlowFiles.add(outputFlowFile);
+        }
+
+        assertEquals(3, outputFlowFiles.size());
+
+        final Set<String> uuids = outputFlowFiles.stream()
+            .map(ff -> ff.getAttribute("uuid"))
+            .collect(Collectors.toSet());
+
+        assertEquals(3, uuids.size());
+        assertTrue(uuids.contains(originalUuid));
+
+        final Predicate<FlowFileRecord> attributeAbcMatches = ff -> ff.getAttribute("abc").equals("xyz");
+        assertTrue(outputFlowFiles.stream().allMatch(attributeAbcMatches));
+    }
+
     @Test
     public void testCheckpointOnSessionDoesNotInteractWithFlowFile() {
         final Relationship relationship = new Relationship.Builder().name("A").build();