You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/11/12 16:45:00 UTC
[nifi] branch main updated: NIFI-7999: Do not call
NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a
disconnected state. Doing so was resulting in that method being called
constantly on startup,
and with the synchronization in place that can result in a huge performance
hit on startup. Also updated RemoteQueuePartition to move a small predicate
into its own method. This was done because the predicate was previously
defined within a synchronized method,
which meant that invoking that predicate requ [...]
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c79ad15 NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking [...]
c79ad15 is described below
commit c79ad1502e7938822e5c10693c294538996f5a61
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Nov 12 10:35:21 2020 -0500
NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking that predicate [...]
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4657.
---
.../clustered/client/async/nio/NioAsyncLoadBalanceClient.java | 2 +-
.../clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java | 7 ++++++-
.../controller/queue/clustered/partition/RemoteQueuePartition.java | 6 +++++-
3 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 1257a3c..ac2a561 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -357,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
return null;
} finally {
- polledPartitions.forEach(partitionQueue::offer);
+ partitionQueue.addAll(polledPartitions);
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
index 5636e22..b699052 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
@@ -27,6 +27,9 @@ import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
public class NioAsyncLoadBalanceClientTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
private static final String EVENT_CATEGORY = "Load-Balanced Connection";
@@ -34,6 +37,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
private final NioAsyncLoadBalanceClientRegistry clientRegistry;
private final ClusterCoordinator clusterCoordinator;
private final EventReporter eventReporter;
+ private final Map<NodeIdentifier, NodeConnectionState> nodeConnectionStates = new HashMap<>();
private volatile boolean running = true;
public NioAsyncLoadBalanceClientTask(final NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) {
@@ -66,7 +70,8 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
}
final NodeConnectionState connectionState = connectionStatus.getState();
- if (connectionState != NodeConnectionState.CONNECTED) {
+ final NodeConnectionState previousState = nodeConnectionStates.put(client.getNodeIdentifier(), connectionState);
+ if (connectionState != NodeConnectionState.CONNECTED && previousState == NodeConnectionState.CONNECTED) {
logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
client.nodeDisconnected();
continue;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 144a043..d25203a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -220,13 +220,17 @@ public class RemoteQueuePartition implements QueuePartition {
// determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
// until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
// creating the connection to send data.
- final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable();
+ final BooleanSupplier emptySupplier = this::isQueueEmpty;
clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
running = true;
}
+ private boolean isQueueEmpty() {
+ return !priorityQueue.isFlowFileAvailable();
+ }
+
public void onRemoved() {
clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
}