You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/04 18:29:02 UTC

[nifi] 07/31: NIFI-9835: Fixed threading bug in which NioAsyncLoadBalanceClient calls LoadBalanceSession.isComplete() followed by LoadBalanceSession.isCanceled() but it's possible for the complete flag to change before the canceled flag (they are not updated atomically). So changed to use a single LoadBalanceSessionState enum that represents the state. Also made the private StandardProcessSession.commit(boolean) method synchronized. When a processor is terminated (as is the case in Offload), we roll bac [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 854c419635f37dffc2c56f74b41fab259e06bd6c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Mar 25 11:43:03 2022 -0400

    NIFI-9835: Fixed threading bug in which NioAsyncLoadBalanceClient calls LoadBalanceSession.isComplete() followed by LoadBalanceSession.isCanceled() but it's possible for the complete flag to change before the canceled flag (they are not updated atomically). So changed to use a single LoadBalanceSessionState enum that represents the state. Also made the private StandardProcessSession.commit(boolean) method synchronized. When a processor is terminated (as is the case in Offload), we rol [...]
    
    This closes #5902
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../repository/StandardProcessSession.java         |  2 +-
 .../client/async/nio/LoadBalanceSession.java       | 72 ++++++++++------------
 .../async/nio/NioAsyncLoadBalanceClient.java       |  9 +--
 .../client/async/nio/TestLoadBalanceSession.java   |  2 +-
 .../nifi/processors/standard/MergeRecord.java      |  2 +-
 5 files changed, 42 insertions(+), 45 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index bf1a1ab1fd..3965b8683e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -543,7 +543,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
         LOG.debug("Successfully committed session {} for {}", this, connectableDescription);
     }
 
-    private void commit(final boolean asynchronous) {
+    private synchronized void commit(final boolean asynchronous) {
         checkpoint(this.checkpoint != null); // If a checkpoint already exists, we need to copy the collection
         commit(this.checkpoint, asynchronous);
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 8b068cd299..56283b91d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -45,7 +45,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalInt;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
@@ -64,7 +63,6 @@ import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalancePro
 public class LoadBalanceSession {
     private static final Logger logger = LoggerFactory.getLogger(LoadBalanceSession.class);
     static final int MAX_DATA_FRAME_SIZE = 65535;
-    private static final long PENALTY_MILLIS = TimeUnit.SECONDS.toMillis(2L);
 
     private final RegisteredPartition partition;
     private final Supplier<FlowFileRecord> flowFileSupplier;
@@ -75,7 +73,6 @@ public class LoadBalanceSession {
     private final String peerDescription;
     private final String connectionId;
     private final TransactionThreshold transactionThreshold;
-    private volatile boolean canceled = false;
 
     final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
     private int protocolVersion = 1;
@@ -85,13 +82,12 @@ public class LoadBalanceSession {
     // guarded by synchronizing on 'this'
     private ByteBuffer preparedFrame;
     private FlowFileRecord currentFlowFile;
-    private List<FlowFileRecord> flowFilesSent = new ArrayList<>();
+    private final List<FlowFileRecord> flowFilesSent = new ArrayList<>();
     private TransactionPhase phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
     private InputStream flowFileInputStream;
-    private byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE];
-    private boolean complete = false;
+    private final byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE];
     private long readTimeout;
-    private long penaltyExpiration = -1L;
+    private volatile LoadBalanceSessionState sessionState = LoadBalanceSessionState.ACTIVE;
 
     public LoadBalanceSession(final RegisteredPartition partition, final FlowFileContentAccess contentAccess, final LoadBalanceFlowFileCodec flowFileCodec, final PeerChannel peerChannel,
                               final int timeoutMillis, final TransactionThreshold transactionThreshold) {
@@ -124,17 +120,12 @@ public class LoadBalanceSession {
         return copy;
     }
 
-    public synchronized boolean isComplete() {
-        return complete;
+    public synchronized LoadBalanceSessionState getSessionState() {
+        return sessionState;
     }
 
     public synchronized boolean communicate() throws IOException {
-        if (isComplete()) {
-            return false;
-        }
-
-        if (isPenalized()) {
-            logger.debug("Will not communicate with Peer {} for Connection {} because session is penalized", peerDescription, connectionId);
+        if (sessionState.isComplete()) {
             return false;
         }
 
@@ -167,25 +158,20 @@ public class LoadBalanceSession {
             final int bytesWritten = channel.write(preparedFrame);
             return bytesWritten > 0;
         } catch (final Exception e) {
-            complete = true;
+            sessionState = LoadBalanceSessionState.COMPLETED_EXCEPTIONALLY;
             throw e;
         }
     }
 
     public synchronized boolean cancel() {
-        if (complete) {
+        if (sessionState.isComplete()) {
             return false;
         }
 
-        complete = true;
-        canceled = true;
+        sessionState = LoadBalanceSessionState.CANCELED;
         return true;
     }
 
-    public boolean isCanceled() {
-        return canceled;
-    }
-
     private boolean confirmTransactionComplete() throws IOException {
         logger.debug("Confirming Transaction Complete for Peer {}", peerDescription);
 
@@ -210,7 +196,7 @@ public class LoadBalanceSession {
             throw new IOException("Expected a CONFIRM_COMPLETE_TRANSACTION response from Peer " + peerDescription + " but received a value of " + response);
         }
 
-        complete = true;
+        sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
         logger.debug("Successfully completed Transaction to send {} FlowFiles to Peer {} for Connection {}", flowFilesSent.size(), peerDescription, connectionId);
 
         return true;
@@ -496,21 +482,19 @@ public class LoadBalanceSession {
             protocolVersion = requestedVersion;
             phase = TransactionPhase.SEND_CONNECTION_ID;
             logger.debug("Peer {} recommended Protocol Version of {}. Accepting version.", peerDescription, requestedVersion);
-
-            return true;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(requestedVersion);
             if (preferred == null) {
                 logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", peerDescription, requestedVersion);
                 phase = TransactionPhase.ABORT_PROTOCOL_NEGOTIATION;
-                return true;
             } else {
                 logger.debug("Peer {} requested version {} of the Protocol. Recommending version {} instead", peerDescription, requestedVersion, preferred);
                 protocolVersion = preferred;
                 phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
-                return true;
             }
         }
+
+        return true;
     }
 
     private ByteBuffer noMoreFlowFiles() {
@@ -592,7 +576,9 @@ public class LoadBalanceSession {
             logger.debug("Peer {} has confirmed that the queue is full for Connection {}", peerDescription, connectionId);
             phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
             checksum.reset(); // We are restarting the session entirely so we need to reset our checksum
-            complete = true; // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session.
+
+            // consider complete because there's nothing else that we can do in this session. Allow client to move on to a different session.
+            sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
             partition.penalize(1000L);
         } else {
             throw new TransactionAbortedException("After requesting to know whether or not Peer " + peerDescription + " has space available in Connection " + connectionId
@@ -602,15 +588,6 @@ public class LoadBalanceSession {
         return true;
     }
 
-    private void penalize() {
-        penaltyExpiration = System.currentTimeMillis() + PENALTY_MILLIS;
-    }
-
-    private boolean isPenalized() {
-        // check for penaltyExpiration > -1L is not strictly necessary as it's implied by the second check but is still
-        // here because it's more efficient to check this than to make the system call to System.currentTimeMillis().
-        return penaltyExpiration > -1L && System.currentTimeMillis() < penaltyExpiration;
-    }
 
 
     private enum TransactionPhase {
@@ -653,4 +630,23 @@ public class LoadBalanceSession {
             return requiredSelectionKey;
         }
     }
+
+    public enum LoadBalanceSessionState {
+        ACTIVE(false),
+
+        COMPLETED_SUCCESSFULLY(true),
+
+        COMPLETED_EXCEPTIONALLY(true),
+
+        CANCELED(true);
+
+        private final boolean complete;
+        LoadBalanceSessionState(final boolean complete) {
+            this.complete = complete;
+        }
+
+        public boolean isComplete() {
+            return complete;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 4412987216..3ff1ef0827 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -130,7 +130,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
 
         logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition);
         final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
-        if (validSession && !loadBalanceSession.isComplete()) {
+        if (validSession && !loadBalanceSession.getSessionState().isComplete()) {
             // Attempt to cancel the session. If successful, trigger the failure callback for the partition.
             // If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure
             if (loadBalanceSession.cancel()) {
@@ -278,7 +278,8 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
                 anySuccess = anySuccess || success;
             } while (success);
 
-            if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
+            final LoadBalanceSession.LoadBalanceSessionState sessionState = loadBalanceSession.getSessionState();
+            if (sessionState.isComplete() && sessionState != LoadBalanceSession.LoadBalanceSessionState.CANCELED) {
                 loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier);
             }
 
@@ -356,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
     }
 
     private synchronized LoadBalanceSession getFailoverSession() {
-        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+        if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) {
             return loadBalanceSession;
         }
 
@@ -402,7 +403,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
     }
 
     private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) {
-        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+        if (loadBalanceSession != null && !loadBalanceSession.getSessionState().isComplete()) {
             return loadBalanceSession;
         }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 43a3cfe3ec..d43ee2be44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -136,7 +136,7 @@ public class TestLoadBalanceSession {
         while (transaction.communicate()) {
         }
 
-        assertTrue(transaction.isComplete());
+        assertTrue(transaction.getSessionState().isComplete());
         socketChannel.close();
 
         final Checksum expectedChecksum = new CRC32();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index ce38273d65..187bcec634 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -195,7 +195,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
         .name("max.bin.count")
         .displayName("Maximum Number of Bins")
         .description("Specifies the maximum number of bins that can be held in memory at any one time. "
-            + "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+            + "This number should not be smaller than the maximum number of concurrent threads for this Processor, "
             + "or the bins that are created will often consist only of a single incoming FlowFile.")
         .defaultValue("10")
         .required(true)