You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/11 13:52:56 UTC
[5/6] nifi git commit: NIFI-5585: Adjustments to the Connection Load
Balancing to ensure that node offloading works smoothly
NIFI-5585: Adjustments to the Connection Load Balancing to ensure that node offloading works smoothly
Signed-off-by: Jeff Storck <jt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1a4c997
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1a4c997
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1a4c997
Branch: refs/heads/master
Commit: a1a4c997634aa7edabda42407a0a7627d33e73fd
Parents: 01e2098
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 8 09:53:14 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 09:23:01 2018 -0400
----------------------------------------------------------------------
.../client/async/AsyncLoadBalanceClient.java | 2 ++
.../async/nio/NioAsyncLoadBalanceClient.java | 4 ++++
.../nio/NioAsyncLoadBalanceClientRegistry.java | 20 ++++++++++++++++----
.../nio/NioAsyncLoadBalanceClientTask.java | 8 ++------
4 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
index 1bb4053..8673a8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
@@ -39,6 +39,8 @@ public interface AsyncLoadBalanceClient {
void unregister(String connectionId);
+ int getRegisteredConnectionCount();
+
boolean isRunning();
boolean isPenalized();
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 066b597..753c1f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -119,6 +119,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
registeredPartitions.remove(connectionId);
}
+ public synchronized int getRegisteredConnectionCount() {
+ return registeredPartitions.size();
+ }
+
private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() {
return new HashMap<>(registeredPartitions);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
index 514a58c..3322035 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
@@ -67,15 +67,27 @@ public class NioAsyncLoadBalanceClientRegistry implements AsyncLoadBalanceClient
@Override
public synchronized void unregister(final String connectionId, final NodeIdentifier nodeId) {
- final Set<AsyncLoadBalanceClient> clients = clientMap.remove(nodeId);
+ final Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId);
if (clients == null) {
return;
}
- clients.forEach(client -> client.unregister(connectionId));
+ final Set<AsyncLoadBalanceClient> toRemove = new HashSet<>();
+ for (final AsyncLoadBalanceClient client : clients) {
+ client.unregister(connectionId);
+ if (client.getRegisteredConnectionCount() == 0) {
+ toRemove.add(client);
+ }
+ }
+
+ clients.removeAll(toRemove);
+ allClients.removeAll(toRemove);
+
+ if (clients.isEmpty()) {
+ clientMap.remove(nodeId);
+ }
- allClients.removeAll(clients);
- logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}", connectionId, nodeId);
+ logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}; {} clients were removed", connectionId, nodeId, toRemove.size());
}
private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier nodeId) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
index 35ea5f9..5c8073a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
@@ -66,13 +66,9 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
}
final NodeConnectionState connectionState = connectionStatus.getState();
- if (connectionState == NodeConnectionState.DISCONNECTED || connectionState == NodeConnectionState.DISCONNECTING) {
- client.nodeDisconnected();
- continue;
- }
-
if (connectionState != NodeConnectionState.CONNECTED) {
- logger.debug("Client {} is for node that is not currently connected (state = {}) so will not communicate with node", client, connectionState);
+ logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
+ client.nodeDisconnected();
continue;
}