You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/23 04:33:47 UTC

nifi git commit: NIFI-2360: Leave ZooKeeper running when a node is disconnected. Do not allow the last node in the cluster to be disconnected. Change ClusterProtocoLHeartbeater to use RetryNTime retry strategy instead of RetryForever because web requests

Repository: nifi
Updated Branches:
  refs/heads/master 393a3925d -> 6932a53ec


NIFI-2360: Leave ZooKeeper running when a node is disconnected. Do not allow the last node in the cluster to be disconnected. Change ClusterProtocoLHeartbeater to use RetryNTime retry strategy instead of RetryForever because web requests could block on this

This closes #705

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6932a53e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6932a53e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6932a53e

Branch: refs/heads/master
Commit: 6932a53ec91513e7c300fdfba2a79c45614a8cea
Parents: 393a392
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 22 09:21:12 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Fri Jul 22 23:57:00 2016 -0400

----------------------------------------------------------------------
 .../node/NodeClusterCoordinator.java            |  22 ++--
 .../IllegalNodeDisconnectionException.java      |   3 +-
 .../node/TestNodeClusterCoordinator.java        |  34 +++++-
 .../apache/nifi/controller/FlowController.java  | 104 ++++++-------------
 .../cluster/ClusterProtocolHeartbeater.java     |   4 +-
 pom.xml                                         |   4 +-
 6 files changed, 76 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 04f8dfa..3f8fa76 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -37,7 +37,7 @@ import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryForever;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@@ -46,6 +46,7 @@ import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
@@ -61,8 +62,8 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
@@ -106,7 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.firewall = firewall;
         this.revisionManager = revisionManager;
 
-        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
         final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
 
         curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
@@ -278,6 +279,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     @Override
     public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+        final int numConnected = getNodeIdentifiers(NodeConnectionState.CONNECTED).size();
+        if (numConnected == 1) {
+            throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
+        }
+
         logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
 
         updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
@@ -548,15 +554,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     @Override
     public boolean isActiveClusterCoordinator() {
         final NodeIdentifier self = getLocalNodeIdentifier();
-        if (self == null) {
-            return false;
-        }
-
-        final NodeConnectionStatus selfStatus = getConnectionStatus(self);
-        if (selfStatus == null) {
-            return false;
-        }
-        return selfStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR);
+        return self != null && self.equals(getElectedActiveCoordinatorNode());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
index 1199ef7..e19a338 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
@@ -17,8 +17,7 @@
 package org.apache.nifi.cluster.manager.exception;
 
 /**
- * Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster, node is primary node).
- *
+ * Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster).
  */
 public class IllegalNodeDisconnectionException extends IllegalClusterStateException {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 06f4252..25c55a0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -48,6 +49,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -336,8 +338,9 @@ public class TestNodeClusterCoordinator {
     @Test(timeout = 5000)
     public void testRequestNodeDisconnect() throws InterruptedException {
         // Add a connected node
-        final NodeIdentifier nodeId = createNodeId(1);
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
+        final NodeIdentifier nodeId1 = createNodeId(1);
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
         while (nodeStatusChangeMessages.isEmpty()) {
@@ -345,19 +348,40 @@ public class TestNodeClusterCoordinator {
         }
         nodeStatusChangeMessages.clear();
 
-        coordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
-        assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId).getState());
+        coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
+        assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
 
         while (nodeStatusChangeMessages.isEmpty()) {
             Thread.sleep(10L);
         }
         final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
-        assertEquals(nodeId, msg.getNodeId());
+        assertEquals(nodeId1, msg.getNodeId());
         assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState());
     }
 
 
     @Test(timeout = 5000)
+    public void testCannotDisconnectLastNode() throws InterruptedException {
+        // Add a connected node
+        final NodeIdentifier nodeId1 = createNodeId(1);
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+
+        // wait for the status change message and clear it
+        while (nodeStatusChangeMessages.isEmpty()) {
+            Thread.sleep(10L);
+        }
+        nodeStatusChangeMessages.clear();
+
+        try {
+            coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
+            Assert.fail("Expected an IllegalNodeDisconnectionException when trying to disconnect last node but it wasn't thrown");
+        } catch (final IllegalNodeDisconnectionException inde) {
+            // expected
+        }
+    }
+
+
+    @Test(timeout = 5000)
     public void testUpdateNodeStatusOutOfOrder() throws InterruptedException {
         // Add a connected node
         final NodeIdentifier nodeId1 = createNodeId(1);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index abe5e27..d6e9308 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -18,6 +18,38 @@ package org.apache.nifi.controller;
 
 import com.sun.jersey.api.client.ClientHandlerException;
 import org.apache.commons.collections4.Predicate;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
@@ -206,37 +238,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
 
 public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
 
@@ -3333,51 +3334,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     leaderElectionManager.start();
                     stateManagerProvider.enableClusterProvider();
 
-                    // Start ZooKeeper State Server if necessary
-                    if (zooKeeperStateServer != null) {
-                        processScheduler.submitFrameworkTask(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    zooKeeperStateServer.start();
-                                } catch (final Exception e) {
-                                    LOG.error("NiFi was connected to the cluster but failed to start embedded ZooKeeper Server", e);
-                                    final Bulletin bulletin = BulletinFactory.createBulletin("Embedded ZooKeeper Server", Severity.ERROR.name(),
-                                        "Unable to started embedded ZooKeeper Server. See logs for more details. Will continue trying to start embedded server.");
-                                    getBulletinRepository().addBulletin(bulletin);
-
-                                    // We failed to start the server. Wait a bit and try again.
-                                    try {
-                                        Thread.sleep(TimeUnit.SECONDS.toMillis(5));
-                                    } catch (final InterruptedException ie) {
-                                        // If we are interrupted, stop trying.
-                                        Thread.currentThread().interrupt();
-                                        return;
-                                    }
-
-                                    processScheduler.submitFrameworkTask(this);
-                                }
-                            }
-                        });
-
-                        // Give the server just a bit to start up, so that we don't get connection
-                        // failures on startup if we are using the embedded ZooKeeper server. We need to launch
-                        // the ZooKeeper Server in the background because ZooKeeper blocks indefinitely when we start
-                        // the server. Unfortunately, we have no way to know when it's up & ready. So we wait 1 second.
-                        // We could still get connection failures if we are on a slow machine but this at least makes it far
-                        // less likely. If we do get connection failures, we will still reconnect, but we will get bulletins
-                        // showing failures. This 1-second sleep is an attempt to at least make that occurrence rare.
-                        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
-                    }
-
                     heartbeat();
                 } else {
                     leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
                     leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
-
-                    if (zooKeeperStateServer != null) {
-                        zooKeeperStateServer.shutdown();
-                    }
                     stateManagerProvider.disableClusterProvider();
 
                     setPrimary(false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
index 0ce9a13..0240318 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@ -24,7 +24,7 @@ import java.util.Properties;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryForever;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -52,7 +52,7 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
     public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
         this.protocolSender = protocolSender;
 
-        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
         final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
 
         curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),

http://git-wip-us.apache.org/repos/asf/nifi/blob/6932a53e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b852c8f..24ef0a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -772,12 +772,12 @@ language governing permissions and limitations under the License. -->
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-framework</artifactId>
-                <version>2.10.0</version>
+                <version>2.11.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-recipes</artifactId>
-                <version>2.10.0</version>
+                <version>2.11.0</version>
             </dependency>