You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/03/09 20:27:12 UTC

[nifi] branch main updated (c73573b -> 4a46b08)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from c73573b  NIFI-9761 Correct PeerChannel processing for TLS 1.3 (#5836)
     new 73356ea  NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.
     new 4a46b08  NIFI-9782 This closes #5854. Excluded H2 DB from nifi-druid-bundle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 nifi-nar-bundles/nifi-druid-bundle/pom.xml         |  4 +++
 .../repository/StandardProcessSession.java         | 21 +++++++++++---
 .../repository/StandardProcessSessionIT.java       | 33 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 4 deletions(-)

[nifi] 02/02: NIFI-9782 This closes #5854. Excluded H2 DB from nifi-druid-bundle

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4a46b087b866205fb85d92853fc82398612d35c9
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Mar 9 11:18:46 2022 -0600

    NIFI-9782 This closes #5854. Excluded H2 DB from nifi-druid-bundle
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 nifi-nar-bundles/nifi-druid-bundle/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
index fcc34be..cff1cd4 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -97,6 +97,10 @@
                         <groupId>commons-logging</groupId>
                         <artifactId>commons-logging</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>com.h2database</groupId>
+                        <artifactId>h2</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>

[nifi] 01/02: NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 73356ea448dced6a789f2d71c5cb5aceac45520e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Mar 9 13:44:08 2022 -0500

    NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../repository/StandardProcessSession.java         | 21 +++++++++++---
 .../repository/StandardProcessSessionIT.java       | 33 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5bec8a6..bf1a1ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1599,11 +1599,24 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
             }
 
             if (repoRecord.getTransferRelationship() != null) {
-                flowFilesOut--;
-                contentSizeOut -= flowFile.getSize();
+                final Relationship transferRelationship = repoRecord.getTransferRelationship();
+                final Collection<Connection> destinations = context.getConnections(transferRelationship);
+                final int numDestinations = destinations.size();
+                final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship);
 
-                newOwner.flowFilesOut++;
-                newOwner.contentSizeOut += flowFile.getSize();
+                if (autoTerminated) {
+                    removedCount--;
+                    removedBytes -= flowFile.getSize();
+
+                    newOwner.removedCount++;
+                    newOwner.removedBytes += flowFile.getSize();
+                } else {
+                    flowFilesOut--;
+                    contentSizeOut -= flowFile.getSize();
+
+                    newOwner.flowFilesOut++;
+                    newOwner.contentSizeOut += flowFile.getSize();
+                }
             }
 
             final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index dc51885..4e300d1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -2453,6 +2453,39 @@ public class StandardProcessSessionIT {
     }
 
     @Test
+    public void testMigrateAfterTransferToAutoTerminatedRelationship() {
+        final long start = System.currentTimeMillis();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8)));
+
+        final StandardProcessSession newSession = new StandardProcessSession(context, () -> false);
+
+        when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet());
+        when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true);
+
+        session.transfer(flowFile, new Relationship.Builder().name("success").build());
+        session.migrate(newSession, Collections.singleton(flowFile));
+
+        session.commit();
+
+        RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(start - 1);
+        FlowFileEvent event = report.getReportEntries().values().iterator().next();
+        assertEquals(0, event.getFlowFilesRemoved());
+        assertEquals(0, event.getContentSizeRemoved());
+        assertEquals(0, event.getFlowFilesOut());
+        assertEquals(0, event.getContentSizeOut());
+
+        newSession.commit();
+        report = flowFileEventRepository.reportTransferEvents(start - 1);
+        event = report.getReportEntries().values().iterator().next();
+        assertEquals(1, event.getFlowFilesRemoved());
+        assertEquals(5, event.getContentSizeRemoved());
+        assertEquals(0, event.getFlowFilesOut());
+        assertEquals(0, event.getContentSizeOut());
+    }
+
+    @Test
     public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() {
         FlowFile flowFile = session.create();
         for (int i = 0; i < 5; i++) {