You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/03/09 20:27:13 UTC

[nifi] 01/02: NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 73356ea448dced6a789f2d71c5cb5aceac45520e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Mar 9 13:44:08 2022 -0500

    NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../repository/StandardProcessSession.java         | 21 +++++++++++---
 .../repository/StandardProcessSessionIT.java       | 33 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5bec8a6..bf1a1ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1599,11 +1599,24 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
             }
 
             if (repoRecord.getTransferRelationship() != null) {
-                flowFilesOut--;
-                contentSizeOut -= flowFile.getSize();
+                final Relationship transferRelationship = repoRecord.getTransferRelationship();
+                final Collection<Connection> destinations = context.getConnections(transferRelationship);
+                final int numDestinations = destinations.size();
+                final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship);
 
-                newOwner.flowFilesOut++;
-                newOwner.contentSizeOut += flowFile.getSize();
+                if (autoTerminated) {
+                    removedCount--;
+                    removedBytes -= flowFile.getSize();
+
+                    newOwner.removedCount++;
+                    newOwner.removedBytes += flowFile.getSize();
+                } else {
+                    flowFilesOut--;
+                    contentSizeOut -= flowFile.getSize();
+
+                    newOwner.flowFilesOut++;
+                    newOwner.contentSizeOut += flowFile.getSize();
+                }
             }
 
             final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index dc51885..4e300d1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -2453,6 +2453,39 @@ public class StandardProcessSessionIT {
     }
 
     @Test
+    public void testMigrateAfterTransferToAutoTerminatedRelationship() {
+        final long start = System.currentTimeMillis();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8)));
+
+        final StandardProcessSession newSession = new StandardProcessSession(context, () -> false);
+
+        when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet());
+        when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true);
+
+        session.transfer(flowFile, new Relationship.Builder().name("success").build());
+        session.migrate(newSession, Collections.singleton(flowFile));
+
+        session.commit();
+
+        RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(start - 1);
+        FlowFileEvent event = report.getReportEntries().values().iterator().next();
+        assertEquals(0, event.getFlowFilesRemoved());
+        assertEquals(0, event.getContentSizeRemoved());
+        assertEquals(0, event.getFlowFilesOut());
+        assertEquals(0, event.getContentSizeOut());
+
+        newSession.commit();
+        report = flowFileEventRepository.reportTransferEvents(start - 1);
+        event = report.getReportEntries().values().iterator().next();
+        assertEquals(1, event.getFlowFilesRemoved());
+        assertEquals(5, event.getContentSizeRemoved());
+        assertEquals(0, event.getFlowFilesOut());
+        assertEquals(0, event.getContentSizeOut());
+    }
+
+    @Test
     public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() {
         FlowFile flowFile = session.create();
         for (int i = 0; i < 5; i++) {