You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/07/18 14:14:02 UTC

nifi git commit: NIFI-2289: Directly ask ZooKeeper which node is cluster coordinator and add watches on the ZNode rather than relying on Node Status Updates over the cluster protocol because cluster protocol may get the events out-of-order

Repository: nifi
Updated Branches:
  refs/heads/master 01cae2374 -> 5c8636edf


NIFI-2289: Directly ask ZooKeeper which node is cluster coordinator and add watches on the ZNode rather than relying on Node Status Updates over the cluster protocol because cluster protocol may get the events out-of-order

This closes #665.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5c8636ed
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5c8636ed
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5c8636ed

Branch: refs/heads/master
Commit: 5c8636edf4bd8ee32892440794e9b40bd009abb6
Parents: 01cae23
Author: Mark Payne <ma...@hotmail.com>
Authored: Sat Jul 16 21:19:44 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jul 18 10:13:42 2016 -0400

----------------------------------------------------------------------
 .../node/NodeClusterCoordinator.java            | 94 ++++++++++++++++++--
 .../NodeClusterCoordinatorFactoryBean.java      |  2 +-
 .../node/TestNodeClusterCoordinator.java        | 15 +++-
 3 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5c8636ed/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index d336558..5a74301 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -18,12 +18,14 @@
 package org.apache.nifi.cluster.coordination.node;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +34,10 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@@ -55,11 +61,14 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,19 +85,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final EventReporter eventReporter;
     private final ClusterNodeFirewall firewall;
     private final RevisionManager revisionManager;
+
+    // Curator used to determine which node is coordinator
+    private final CuratorFramework curatorClient;
+    private final String nodesPathPrefix;
+    private final String coordinatorPath;
+
     private volatile FlowService flowService;
     private volatile boolean connected;
+    private volatile String coordinatorAddress;
 
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
 
-    public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener,
-        final EventReporter eventReporter, final ClusterNodeFirewall firewall, final RevisionManager revisionManager) {
+    public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter,
+        final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final Properties nifiProperties) {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
         this.firewall = firewall;
         this.revisionManager = revisionManager;
+
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
+
+        curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
+            zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
+
+        curatorClient.start();
+        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
+        coordinatorPath = nodesPathPrefix + "/coordinator";
+
         senderListener.addHandler(this);
     }
 
@@ -101,6 +128,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
         updateNodeStatus(shutdownStatus);
         logger.info("Successfully notified other nodes that I am shutting down");
+
+        curatorClient.close();
     }
 
     @Override
@@ -128,9 +157,33 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return localNodeId;
     }
 
+    private String getElectedActiveCoordinatorAddress() throws IOException {
+        final String curAddress = coordinatorAddress;
+        if (curAddress != null) {
+            return curAddress;
+        }
+
+        try {
+            // Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
+            final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
+                @Override
+                public void process(final WatchedEvent event) {
+                    coordinatorAddress = null;
+                }
+            }).forPath(coordinatorPath);
+            final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
+
+            logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
+            return address;
+        } catch (Exception e) {
+            throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
+        }
+    }
+
     @Override
     public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
         logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap);
+        coordinatorAddress = null;
 
         // For each proposed replacement, update the nodeStatuses map if and only if the replacement
         // has a larger update id than the current value.
@@ -455,13 +508,41 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     @Override
     public NodeIdentifier getElectedActiveCoordinatorNode() {
+        final String electedNodeAddress;
+        try {
+            electedNodeAddress = getElectedActiveCoordinatorAddress();
+        } catch (final IOException ioe) {
+            logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe);
+            return null;
+        }
+
+        final int colonLoc = electedNodeAddress.indexOf(':');
+        if (colonLoc < 1) {
+            logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
+            return null;
+        }
+
+        final String electedNodeHostname = electedNodeAddress.substring(0, colonLoc);
+        final String portString = electedNodeAddress.substring(colonLoc + 1);
+        final int electedNodePort;
+        try {
+            electedNodePort = Integer.parseInt(portString);
+        } catch (final NumberFormatException nfe) {
+            logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
+            return null;
+        }
+
         final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
-        return connectedNodeIds.stream()
-            .map(nodeId -> getConnectionStatus(nodeId))
-            .filter(status -> status.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR))
+        final NodeIdentifier electedNodeId = connectedNodeIds.stream()
+            .filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort)
             .findFirst()
-            .map(status -> status.getNodeIdentifier())
             .orElse(null);
+
+        if (electedNodeId == null) {
+            logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress);
+        }
+
+        return electedNodeId;
     }
 
     @Override
@@ -858,6 +939,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     @Override
     public void setConnected(final boolean connected) {
         this.connected = connected;
+        this.coordinatorAddress = null; // if connection state changed, we are not sure about the coordinator. Check for address again.
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5c8636ed/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index 6229ba1..b414e0d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -43,7 +43,7 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
             final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
             final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
 
-            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager);
+            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager, properties);
         }
 
         return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/5c8636ed/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index e3d5295..06f4252 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -58,6 +59,12 @@ public class TestNodeClusterCoordinator {
     private ClusterCoordinationProtocolSenderListener senderListener;
     private List<NodeStatusChangeMessage> nodeStatusChangeMessages;
 
+    private Properties createProperties() {
+        final Properties props = new Properties();
+        props.put("nifi.zookeeper.connect.string", "localhost:2181");
+        return props;
+    }
+
     @Before
     @SuppressWarnings("unchecked")
     public void setup() throws IOException {
@@ -77,7 +84,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
+        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
@@ -127,7 +134,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
 
         final NodeIdentifier requestedNodeId = createNodeId(6);
         final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
@@ -161,7 +168,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
@@ -225,7 +232,7 @@ public class TestNodeClusterCoordinator {
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);