You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/10/09 12:15:55 UTC

nifi git commit: NIFI-5663: Ensure that when sort Node Identifiers that we use both the node's API Address as well as API Port, in case 2 nodes are running on same host. Also ensure that when Local Node ID is determined that we update all Load Balancing

Repository: nifi
Updated Branches:
  refs/heads/master 768bcfb50 -> c87d79193


NIFI-5663: Ensure that when sort Node Identifiers that we use both the node's API Address as well as API Port, in case 2 nodes are running on same host. Also ensure that when Local Node ID is determined that we update all Load Balancing Partitions, if necessary

This closes #3048.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c87d7919
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c87d7919
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c87d7919

Branch: refs/heads/master
Commit: c87d791938562de04ee598ebffa296f954130ca7
Parents: 768bcfb
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 5 12:06:39 2018 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue Oct 9 21:14:31 2018 +0900

----------------------------------------------------------------------
 .../clustered/SocketLoadBalancedFlowFileQueue.java    | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c87d7919/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index f250200..193a961 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -571,7 +571,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
 
             // Re-define 'queuePartitions' array
             final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers);
-            sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
+            sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort()));
 
             final QueuePartition[] updatedQueuePartitions;
             if (sortedNodeIdentifiers.isEmpty()) {
@@ -990,6 +990,14 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                     return;
                 }
 
+                if (!nodeIdentifiers.contains(localNodeId)) {
+                    final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
+                    updatedNodeIds.add(localNodeId);
+
+                    logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", localNodeId, SocketLoadBalancedFlowFileQueue.this);
+                    setNodeIdentifiers(updatedNodeIds, false);
+                }
+
                 logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions);
 
                 for (final QueuePartition partition : queuePartitions) {
@@ -1009,7 +1017,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                         logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions",
                                 SocketLoadBalancedFlowFileQueue.this, localNodeId, partition);
 
-                        setNodeIdentifiers(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, true);
+                        final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
+                        updatedNodeIds.add(localNodeId);
+                        setNodeIdentifiers(updatedNodeIds, true);
                         return;
                     }
                 }