You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:06 UTC

[nifi] 02/22: NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the connection more than w [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit de03b7dbc047df8ca38b175080af31b8fd2edb49
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Dec 2 11:21:36 2021 -0500

    NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the connection more t [...]
---
 .../clustered/client/async/nio/LoadBalanceSession.java      |  7 ++++---
 .../client/async/nio/NioAsyncLoadBalanceClient.java         | 13 +++++++------
 .../clustered/client/async/nio/TestLoadBalanceSession.java  |  4 ++--
 3 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 011558b..4178e55 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -43,7 +43,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
@@ -119,8 +118,10 @@ public class LoadBalanceSession {
         return phase.getRequiredSelectionKey();
     }
 
-    public synchronized List<FlowFileRecord> getFlowFilesSent() {
-        return Collections.unmodifiableList(flowFilesSent);
+    public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
+        final List<FlowFileRecord> copy = new ArrayList<>(flowFilesSent);
+        flowFilesSent.clear();
+        return copy;
     }
 
     public synchronized boolean isComplete() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f88e0ef..a322b24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -129,11 +129,12 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
         }
 
         logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition);
-        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+        final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
+        if (validSession && !loadBalanceSession.isComplete()) {
             // Attempt to cancel the session. If successful, trigger the failure callback for the partition.
             // If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure
             if (loadBalanceSession.cancel()) {
-                final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getAndPurgeFlowFilesSent();
 
                 logger.debug("{} Triggering failure callback for {} FlowFiles for Registered Partition {} because partition was unregistered", this, flowFilesSent.size(), removedPartition);
                 removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
@@ -268,7 +269,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
                         loadBalanceSession.getPartition().getConnectionId() + " due to " + e);
 
                     penalize();
-                    loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
+                    loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
                     close();
 
                     return false;
@@ -278,7 +279,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
             } while (success);
 
             if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
-                loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(), nodeIdentifier);
+                loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier);
             }
 
             return anySuccess;
@@ -311,10 +312,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
                 loadBalanceSession = null;
 
                 logger.debug("Node {} disconnected so will terminate the Load Balancing Session", nodeIdentifier);
-                final List<FlowFileRecord> flowFilesSent = session.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = session.getAndPurgeFlowFilesSent();
 
                 if (!flowFilesSent.isEmpty()) {
-                    session.getPartition().getFailureCallback().onTransactionFailed(session.getFlowFilesSent(), TransactionFailureCallback.TransactionPhase.SENDING);
+                    session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
                 }
 
                 close();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 2ae0f0b..43a3cfe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -190,7 +190,7 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getAndPurgeFlowFilesSent());
     }
 
 
@@ -271,6 +271,6 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1), transaction.getAndPurgeFlowFilesSent());
     }
 }