You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/11 13:52:56 UTC

[5/6] nifi git commit: NIFI-5585: Adjustments to the Connection Load Balancing to ensure that node offloading works smoothly

NIFI-5585: Adjustments to the Connection Load Balancing to ensure that node offloading works smoothly

Signed-off-by: Jeff Storck <jt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1a4c997
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1a4c997
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1a4c997

Branch: refs/heads/master
Commit: a1a4c997634aa7edabda42407a0a7627d33e73fd
Parents: 01e2098
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 8 09:53:14 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 11 09:23:01 2018 -0400

----------------------------------------------------------------------
 .../client/async/AsyncLoadBalanceClient.java    |  2 ++
 .../async/nio/NioAsyncLoadBalanceClient.java    |  4 ++++
 .../nio/NioAsyncLoadBalanceClientRegistry.java  | 20 ++++++++++++++++----
 .../nio/NioAsyncLoadBalanceClientTask.java      |  8 ++------
 4 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
index 1bb4053..8673a8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
@@ -39,6 +39,8 @@ public interface AsyncLoadBalanceClient {
 
     void unregister(String connectionId);
 
+    int getRegisteredConnectionCount();
+
     boolean isRunning();
 
     boolean isPenalized();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 066b597..753c1f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -119,6 +119,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
         registeredPartitions.remove(connectionId);
     }
 
+    public synchronized int getRegisteredConnectionCount() {
+        return registeredPartitions.size();
+    }
+
     private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() {
         return new HashMap<>(registeredPartitions);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
index 514a58c..3322035 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
@@ -67,15 +67,27 @@ public class NioAsyncLoadBalanceClientRegistry implements AsyncLoadBalanceClient
 
     @Override
     public synchronized void unregister(final String connectionId, final NodeIdentifier nodeId) {
-        final Set<AsyncLoadBalanceClient> clients = clientMap.remove(nodeId);
+        final Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId);
         if (clients == null) {
             return;
         }
 
-        clients.forEach(client -> client.unregister(connectionId));
+        final Set<AsyncLoadBalanceClient> toRemove = new HashSet<>();
+        for (final AsyncLoadBalanceClient client : clients) {
+            client.unregister(connectionId);
+            if (client.getRegisteredConnectionCount() == 0) {
+                toRemove.add(client);
+            }
+        }
+
+        clients.removeAll(toRemove);
+        allClients.removeAll(toRemove);
+
+        if (clients.isEmpty()) {
+            clientMap.remove(nodeId);
+        }
 
-        allClients.removeAll(clients);
-        logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}", connectionId, nodeId);
+        logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}; {} clients were removed", connectionId, nodeId, toRemove.size());
     }
 
     private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier nodeId) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
index 35ea5f9..5c8073a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
@@ -66,13 +66,9 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
                     }
 
                     final NodeConnectionState connectionState = connectionStatus.getState();
-                    if (connectionState == NodeConnectionState.DISCONNECTED || connectionState == NodeConnectionState.DISCONNECTING) {
-                        client.nodeDisconnected();
-                        continue;
-                    }
-
                     if (connectionState != NodeConnectionState.CONNECTED) {
-                        logger.debug("Client {} is for node that is not currently connected (state = {}) so will not communicate with node", client, connectionState);
+                        logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
+                        client.nodeDisconnected();
                         continue;
                     }