You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/11/12 16:45:00 UTC

[nifi] branch main updated: NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking that predicate requ [...]

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c79ad15  NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking  [...]
c79ad15 is described below

commit c79ad1502e7938822e5c10693c294538996f5a61
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Nov 12 10:35:21 2020 -0500

    NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking that predicate [...]
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4657.
---
 .../clustered/client/async/nio/NioAsyncLoadBalanceClient.java      | 2 +-
 .../clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java  | 7 ++++++-
 .../controller/queue/clustered/partition/RemoteQueuePartition.java | 6 +++++-
 3 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 1257a3c..ac2a561 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -357,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
 
             return null;
         } finally {
-            polledPartitions.forEach(partitionQueue::offer);
+            partitionQueue.addAll(polledPartitions);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
index 5636e22..b699052 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
@@ -27,6 +27,9 @@ import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class NioAsyncLoadBalanceClientTask implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
     private static final String EVENT_CATEGORY = "Load-Balanced Connection";
@@ -34,6 +37,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
     private final NioAsyncLoadBalanceClientRegistry clientRegistry;
     private final ClusterCoordinator clusterCoordinator;
     private final EventReporter eventReporter;
+    private final Map<NodeIdentifier, NodeConnectionState> nodeConnectionStates = new HashMap<>();
     private volatile boolean running = true;
 
     public NioAsyncLoadBalanceClientTask(final NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) {
@@ -66,7 +70,8 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
                     }
 
                     final NodeConnectionState connectionState = connectionStatus.getState();
-                    if (connectionState != NodeConnectionState.CONNECTED) {
+                    final NodeConnectionState previousState = nodeConnectionStates.put(client.getNodeIdentifier(), connectionState);
+                    if (connectionState != NodeConnectionState.CONNECTED && previousState == NodeConnectionState.CONNECTED) {
                         logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
                         client.nodeDisconnected();
                         continue;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 144a043..d25203a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -220,13 +220,17 @@ public class RemoteQueuePartition implements QueuePartition {
         // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
         // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
         // creating the connection to send data.
-        final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable();
+        final BooleanSupplier emptySupplier = this::isQueueEmpty;
         clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
             failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
 
         running = true;
     }
 
+    private boolean isQueueEmpty() {
+        return !priorityQueue.isFlowFileAvailable();
+    }
+
     public void onRemoved() {
         clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
     }