You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:52 UTC

[16/18] nifi git commit: NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. This closes #301

NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. This closes #301

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1ac05266
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1ac05266
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1ac05266

Branch: refs/heads/master
Commit: 1ac05266a5cbab0c05551fc40201376cca13b540
Parents: 0d3bd2c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Mar 23 13:16:13 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Apr 4 11:47:08 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   6 +
 .../org/apache/nifi/util/NiFiProperties.java    |   9 +
 .../protocol/ClusterManagerProtocolSender.java  |   9 -
 .../impl/ClusterManagerProtocolSenderImpl.java  |  27 --
 .../ClusterManagerProtocolSenderListener.java   |   7 -
 .../protocol/jaxb/message/ObjectFactory.java    |   5 -
 .../message/PrimaryRoleAssignmentMessage.java   |  55 ----
 .../protocol/message/ProtocolMessage.java       |   1 -
 .../nifi/cluster/manager/ClusterManager.java    |  18 --
 .../cluster/manager/impl/WebClusterManager.java | 199 +------------
 .../nifi-framework/nifi-framework-core/pom.xml  |   9 +
 .../apache/nifi/controller/FlowController.java  | 103 ++++---
 .../nifi/controller/StandardFlowService.java    |  40 ---
 .../election/CuratorLeaderElectionManager.java  | 285 +++++++++++++++++++
 .../leader/election/LeaderElectionManager.java  |  71 +++++
 .../LeaderElectionStateChangeListener.java      |  35 +++
 .../src/main/resources/conf/nifi.properties     |   6 +
 .../nifi/web/StandardNiFiServiceFacade.java     |   6 -
 pom.xml                                         |  13 +
 19 files changed, 495 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 0422b8e..09a8d50 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -456,6 +456,12 @@ language governing permissions and limitations under the License. -->
         <nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
         <nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
 
+        <!--  nifi.properties: zookeeper properties -->
+        <nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
+        <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
+        <nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
+        <nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
+
         <!-- nifi.properties: kerberos properties -->
         <nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
         <nifi.kerberos.service.principal />

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 8c98c0b..517b19a 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -174,6 +174,12 @@ public class NiFiProperties extends Properties {
     public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
     public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";
 
+    // zookeeper properties
+    public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
+    public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
+    public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
+    public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
+
     // cluster manager properties
     public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
     public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
@@ -226,6 +232,9 @@ public class NiFiProperties extends Properties {
     public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
     public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
     public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
+    public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
+    public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
+    public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
 
     // cluster common defaults
     public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
index 10653ff..bdefbbf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
@@ -19,7 +19,6 @@ package org.apache.nifi.cluster.protocol;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -57,14 +56,6 @@ public interface ClusterManagerProtocolSender {
     void disconnect(DisconnectMessage msg) throws ProtocolException;
 
     /**
-     * Sends an "assign primary role" message to a node.
-     *
-     * @param msg a message
-     * @throws ProtocolException if communication failed
-     */
-    void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException;
-
-    /**
      * Sets the {@link BulletinRepository} that can be used to report bulletins
      *
      * @param bulletinRepository repo

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index 636a6d3..fb9292e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -56,7 +55,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
     private final ProtocolContext<ProtocolMessage> protocolContext;
     private final SocketConfiguration socketConfiguration;
     private int handshakeTimeoutSeconds;
-    private volatile BulletinRepository bulletinRepository;
 
     public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
         if (socketConfiguration == null) {
@@ -71,7 +69,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
 
     @Override
     public void setBulletinRepository(final BulletinRepository bulletinRepository) {
-        this.bulletinRepository = bulletinRepository;
     }
 
     /**
@@ -183,30 +180,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
         }
     }
 
-    /**
-     * Assigns the primary role to a node.
-     *
-     * @param msg a message
-     *
-     * @throws ProtocolException if the message failed to be sent
-     */
-    @Override
-    public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
-        Socket socket = null;
-        try {
-            socket = createSocket(msg.getNodeId(), true);
-
-            try {
-                // marshal message to output stream
-                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
-                marshaller.marshal(msg, socket.getOutputStream());
-            } catch (final IOException ioe) {
-                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
-            }
-        } finally {
-            SocketUtils.closeQuietly(socket);
-        }
-    }
 
     private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
         // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 8eb83a4..54d33a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -108,10 +107,4 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
     public void disconnect(DisconnectMessage msg) throws ProtocolException {
         sender.disconnect(msg);
     }
-
-    @Override
-    public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
-        sender.assignPrimaryRole(msg);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 516b67e..25041ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -27,7 +27,6 @@ import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
@@ -92,8 +91,4 @@ public class ObjectFactory {
     public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
         return new ControllerStartupFailureMessage();
     }
-
-    public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
-        return new PrimaryRoleAssignmentMessage();
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
deleted file mode 100644
index 4b7563a..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-/**
- */
-@XmlRootElement(name = "primaryRoleAssignmentMessage")
-public class PrimaryRoleAssignmentMessage extends ProtocolMessage {
-
-    private NodeIdentifier nodeId;
-
-    private boolean primary;
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(NodeIdentifier nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public boolean isPrimary() {
-        return primary;
-    }
-
-    public void setPrimary(boolean primary) {
-        this.primary = primary;
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.PRIMARY_ROLE;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index f01efd8..5953e09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -31,7 +31,6 @@ public abstract class ProtocolMessage {
         FLOW_RESPONSE,
         HEARTBEAT,
         PING,
-        PRIMARY_ROLE,
         RECONNECTION_REQUEST,
         RECONNECTION_RESPONSE,
         SERVICE_BROADCAST,

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 51de54b..27ada88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -23,9 +23,7 @@ import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
 import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
@@ -142,22 +140,6 @@ public interface ClusterManager extends NodeInformant {
     List<Event> getNodeEvents(final String nodeId);
 
     /**
-     * Revokes the primary role from the current primary node and assigns the primary role to given given node ID.
-     *
-     * If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed.
-     *
-     * If role assignment fails, then the given node is set to disconnected and is given the primary role.
-     *
-     * @param nodeId the node identifier
-     * @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned
-     *
-     * @throws UnknownNodeException if the node with the given identifier does not exist
-     * @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node
-     * @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node
-     */
-    void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
-
-    /**
      * @return the primary node of the cluster or null if no primary node exists
      */
     Node getPrimaryNode();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 303e98e..6f1bc2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -93,12 +93,10 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
 import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
 import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
 import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
 import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
 import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.exception.UriConstructionException;
@@ -118,7 +116,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -551,9 +548,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     servicesBroadcaster.start();
                 }
 
-                // start in safe mode
-                executeSafeModeTask();
-
                 // Load and start running Reporting Tasks
                 final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
                 if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
@@ -1312,88 +1306,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    /**
-     * Messages the node to have the primary role. If the messaging fails, then the node is marked as disconnected.
-     *
-     * @param nodeId the node ID to assign primary role
-     *
-     * @return true if primary role assigned; false otherwise
-     */
-    private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
-        writeLock.lock();
-        try {
-            // create primary role message
-            final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
-            msg.setNodeId(nodeId);
-            msg.setPrimary(true);
-            logger.info("Attempting to assign primary role to node: " + nodeId);
-
-            // message
-            senderListener.assignPrimaryRole(msg);
-
-            logger.info("Assigned primary role to node: " + nodeId);
-            addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
-
-            // true indicates primary role assigned
-            return true;
-
-        } catch (final ProtocolException ex) {
-
-            logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex);
-            addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex);
-
-            // mark node as disconnected and log/record the event
-            final Node node = getRawNode(nodeId.getId());
-            node.setStatus(Status.DISCONNECTED);
-            addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role.");
-
-            addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role");
-
-            // false indicates primary role failed to be assigned
-            return false;
-        } finally {
-            writeLock.unlock("assignPrimaryRole");
-        }
-    }
-
-    /**
-     * Messages the node with the given node ID to no longer have the primary role. If the messaging fails, then the node is marked as disconnected.
-     *
-     * @return true if the primary role was revoked from the node; false otherwise
-     */
-    private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
-        writeLock.lock();
-        try {
-            // create primary role message
-            final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
-            msg.setNodeId(nodeId);
-            msg.setPrimary(false);
-            logger.info("Attempting to revoke primary role from node: " + nodeId);
-
-            // send message
-            senderListener.assignPrimaryRole(msg);
-
-            logger.info("Revoked primary role from node: " + nodeId);
-            addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node");
-
-            // true indicates primary role was revoked
-            return true;
-        } catch (final ProtocolException ex) {
-
-            logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex);
-
-            // mark node as disconnected and log/record the event
-            final Node node = getRawNode(nodeId.getId());
-            node.setStatus(Status.DISCONNECTED);
-            addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role.");
-            addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role");
-
-            // false indicates primary role failed to be revoked
-            return false;
-        } finally {
-            writeLock.unlock("revokePrimaryRole");
-        }
-    }
 
     private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
@@ -1778,12 +1690,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     // get a raw reference to the node (if it doesn't exist, node will be null)
                     node = getRawNode(resolvedNodeIdentifier.getId());
 
-                    // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role
-                    if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) {
-                        addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node.  Revoking primary role because primary role is assigned to a different node.");
-                        revokePrimaryRole(resolvedNodeIdentifier);
-                    }
-
                     final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
 
                     if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
@@ -1871,6 +1777,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
                         // record heartbeat
                         node.setHeartbeat(mostRecentHeartbeat);
+
+                        if (mostRecentHeartbeat.isPrimary()) {
+                            setPrimaryNodeId(node.getNodeId());
+                        }
                     }
                 } catch (final Exception e) {
                     logger.error("Failed to process heartbeat from {}:{} due to {}",
@@ -1984,47 +1894,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    @Override
-    public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException {
-        writeLock.lock();
-        try {
-
-            final Node node = getNode(nodeId);
-            if (node == null) {
-                throw new UnknownNodeException("Node does not exist.");
-            } else if (Status.CONNECTED != node.getStatus()) {
-                throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node.");
-            }
-
-            // revoke primary role
-            final Node primaryNode;
-            if ((primaryNode = getPrimaryNode()) != null) {
-                if (primaryNode.getStatus() == Status.DISCONNECTED) {
-                    throw new PrimaryRoleAssignmentException("A disconnected, primary node exists.  Delete the node before assigning the primary role to a different node.");
-                } else if (revokePrimaryRole(primaryNode.getNodeId())) {
-                    addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment.");
-                } else {
-                    throw new PrimaryRoleAssignmentException(
-                            "Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node.");
-                }
-            }
-
-            // change the primary node ID to the given node
-            setPrimaryNodeId(node.getNodeId());
-
-            // assign primary role
-            if (assignPrimaryRole(node.getNodeId())) {
-                addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn);
-                addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn);
-            } else {
-                throw new PrimaryRoleAssignmentException(
-                        "Cluster manager assigned primary role to node, but the node failed to accept the assignment.  Cluster manager disconnected node.");
-            }
-        } finally {
-            writeLock.unlock("setPrimaryNode");
-        }
-    }
-
     private int getClusterProtocolHeartbeatSeconds() {
         return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS);
     }
@@ -4508,66 +4377,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void executeSafeModeTask() {
-
-        new Thread(new Runnable() {
-
-            private final long threadStartTime = System.currentTimeMillis();
-
-            @Override
-            public void run() {
-                logger.info("Entering safe mode...");
-                final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
-                final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
-                boolean exitSafeMode = false;
-                while (isRunning()) {
-
-                    writeLock.lock();
-                    try {
-
-                        final long currentTime = System.currentTimeMillis();
-                        if (timeToElect < currentTime) {
-                            final Set<NodeIdentifier> connectedNodeIds = getNodeIds(Status.CONNECTED);
-                            if (!connectedNodeIds.isEmpty()) {
-                                // get first connected node ID
-                                final NodeIdentifier connectedNodeId = connectedNodeIds.iterator().next();
-                                if (assignPrimaryRole(connectedNodeId)) {
-                                    try {
-                                        setPrimaryNodeId(connectedNodeId);
-                                        exitSafeMode = true;
-                                    } catch (final DaoException de) {
-                                        final String message = String.format("Failed to persist primary node ID '%s' in cluster dataflow.", connectedNodeId);
-                                        logger.warn(message);
-                                        addBulletin(connectedNodeId, Severity.WARNING, message);
-                                        revokePrimaryRole(connectedNodeId);
-                                    }
-                                }
-                            }
-                        }
-
-                        if (!isInSafeMode()) {
-                            // a primary node has been selected outside of this thread
-                            exitSafeMode = true;
-                            logger.info("Exiting safe mode because " + primaryNodeId + " has been assigned the primary role.");
-                            break;
-                        }
-                    } finally {
-                        writeLock.unlock("executeSafeModeTask");
-                    }
-
-                    if (!exitSafeMode) {
-                        // sleep for a bit
-                        try {
-                            Thread.sleep(1000);
-                        } catch (final InterruptedException ie) {
-                            return;
-                        }
-                    }
-
-                }
-            }
-        }).start();
-    }
 
     /**
      * This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index dc5b7d3..ff3ecba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -131,6 +131,15 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 632fa1a..9f14354 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -83,6 +83,9 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -231,6 +234,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
+    public static final String PRIMARY_NODE_ROLE_NAME = "primary-node";
 
     private final AtomicInteger maxTimerDrivenThreads;
     private final AtomicInteger maxEventDrivenThreads;
@@ -277,6 +281,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private ProcessGroup rootGroup;
     private final List<Connectable> startConnectablesAfterInitialization;
     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
+    private final LeaderElectionManager leaderElectionManager;
+
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -329,12 +335,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     // guarded by rwLock
     /**
-     * true if controller is the primary of the cluster
-     */
-    private boolean primary;
-
-    // guarded by rwLock
-    /**
      * true if connected to a cluster
      */
     private boolean connected;
@@ -527,6 +527,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+        if (configuredForClustering) {
+            leaderElectionManager = new CuratorLeaderElectionManager(4);
+        } else {
+            leaderElectionManager = null;
+        }
     }
 
     private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
@@ -1159,6 +1164,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalStateException("Controller already stopped or still stopping...");
             }
 
+            if (leaderElectionManager != null) {
+                leaderElectionManager.stop();
+            }
+
             if (kill) {
                 this.timerDrivenEngineRef.get().shutdownNow();
                 this.eventDrivenEngineRef.get().shutdownNow();
@@ -1365,7 +1374,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
         } finally {
             writeLock.unlock();
         }
@@ -3119,6 +3128,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // update the bulletin repository
             if (isChanging) {
                 if (clustered) {
+                    leaderElectionManager.register(PRIMARY_NODE_ROLE_NAME, new LeaderElectionStateChangeListener() {
+                        @Override
+                        public void onLeaderElection() {
+                            setPrimary(true);
+                        }
+
+                        @Override
+                        public void onLeaderRelinquish() {
+                            setPrimary(false);
+                        }
+                    });
+
+                    leaderElectionManager.start();
                     stateManagerProvider.enableClusterProvider();
 
                     if (zooKeeperStateServer != null) {
@@ -3157,6 +3179,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                     }
                 } else {
+                    leaderElectionManager.unregister(PRIMARY_NODE_ROLE_NAME);
+
                     if (zooKeeperStateServer != null) {
                         zooKeeperStateServer.shutdown();
                     }
@@ -3170,7 +3194,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
         } finally {
             writeLock.unlock();
         }
@@ -3180,51 +3204,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @return true if this instance is the primary node in the cluster; false otherwise
      */
     public boolean isPrimary() {
-        rwLock.readLock().lock();
-        try {
-            return primary;
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return leaderElectionManager != null && leaderElectionManager.isLeader(PRIMARY_NODE_ROLE_NAME);
     }
 
     public void setPrimary(final boolean primary) {
-        rwLock.writeLock().lock();
-        try {
-            // no update, so return
-            if (this.primary == primary) {
-                return;
-            }
-
-            LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'");
-
-            final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
-            final ProcessGroup rootGroup = getGroup(getRootGroupId());
-            for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
-                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
-                }
+        final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
+        final ProcessGroup rootGroup = getGroup(getRootGroupId());
+        for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
+            try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
             }
-            for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
-                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
-                }
+        }
+        for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
+            try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
             }
-            for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
-                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
-                }
+        }
+        for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
+            try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
             }
+        }
 
-            // update primary
-            this.primary = primary;
-            eventDrivenWorkerQueue.setPrimary(primary);
+        // update primary
+        eventDrivenWorkerQueue.setPrimary(primary);
 
-            // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        // update the heartbeat bean
+        this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+
+        // Emit a bulletin detailing the fact that the primary node state has changed
+        final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
+        final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
+        bulletinRepository.addBulletin(bulletin);
     }
 
     static boolean areEqual(final String a, final String b) {
@@ -3603,7 +3614,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             this.connected = connected;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
         } finally {
             rwLock.writeLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 6250c5a..67d0338 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -55,7 +55,6 @@ import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -342,7 +341,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             case RECONNECTION_REQUEST:
             case DISCONNECTION_REQUEST:
             case FLOW_REQUEST:
-            case PRIMARY_ROLE:
                 return true;
             default:
                 return false;
@@ -381,14 +379,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                     }, "Disconnect from Cluster").start();
 
                     return null;
-                case PRIMARY_ROLE:
-                    new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request);
-                        }
-                    }, "Set Primary Role Status").start();
-                    return null;
                 default:
                     throw new ProtocolException("Handler cannot handle message type: " + request.getType());
             }
@@ -512,14 +502,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         }
     }
 
-    private void handlePrimaryRoleAssignment(final PrimaryRoleAssignmentMessage msg) {
-        writeLock.lock();
-        try {
-            controller.setPrimary(msg.isPrimary());
-        } finally {
-            writeLock.unlock();
-        }
-    }
 
     private void handleReconnectionRequest(final ReconnectionRequestMessage request) {
         writeLock.lock();
@@ -747,9 +729,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             controller.setConnected(true);
 
-            // set primary
-            controller.setPrimary(response.isPrimary());
-
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
 
@@ -862,7 +841,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
     }
 
     private class SaveHolder {
-
         private final Calendar saveTime;
         private final boolean shouldArchive;
 
@@ -871,22 +849,4 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             shouldArchive = archive;
         }
     }
-
-    public boolean isPrimary() {
-        readLock.lock();
-        try {
-            return controller.isPrimary();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public void setPrimary(boolean primary) {
-        writeLock.lock();
-        try {
-            controller.setPrimary(primary);
-        } finally {
-            writeLock.unlock();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
new file mode 100644
index 0000000..4c0cbd0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CuratorLeaderElectionManager implements LeaderElectionManager {
+    private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
+
+    private final FlowEngine leaderElectionMonitorEngine;
+    private final int sessionTimeoutMs;
+    private final int connectionTimeoutMs;
+    private final String rootPath;
+    private final String connectString;
+
+    private CuratorFramework curatorClient;
+
+    private volatile boolean stopped = true;
+
+    private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
+    private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>();
+
+    public CuratorLeaderElectionManager(final int threadPoolSize) {
+        leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
+
+        final NiFiProperties properties = NiFiProperties.getInstance();
+
+        connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+        if (connectString == null || connectString.trim().isEmpty()) {
+            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
+        }
+
+        sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+        connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
+        rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
+
+        try {
+            PathUtils.validatePath(rootPath);
+        } catch (final IllegalArgumentException e) {
+            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
+        }
+    }
+
+
+    @Override
+    public synchronized void start() {
+        if (!stopped) {
+            return;
+        }
+
+        stopped = false;
+
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        curatorClient = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+        curatorClient.start();
+
+        // Call #register for each already-registered role. This will
+        // cause us to start listening for leader elections for that
+        // role again
+        for (final Map.Entry<String, LeaderElectionStateChangeListener> entry : registeredRoles.entrySet()) {
+            register(entry.getKey(), entry.getValue());
+        }
+
+        logger.info("{} started", this);
+    }
+
+    private int getTimePeriod(final NiFiProperties properties, final String propertyName, final String defaultValue) {
+        final String timeout = properties.getProperty(propertyName, defaultValue);
+        try {
+            return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
+            return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+        }
+    }
+
+
+    @Override
+    public synchronized void register(final String roleName) {
+        register(roleName, null);
+    }
+
+
+    @Override
+    public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) {
+        logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
+
+        if (leaderRoles.containsKey(roleName)) {
+            logger.warn("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
+            return;
+        }
+
+        final String leaderPath = (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
+
+        try {
+            PathUtils.validatePath(rootPath);
+        } catch (final IllegalArgumentException e) {
+            throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
+        }
+
+        registeredRoles.put(roleName, listener);
+
+        if (!isStopped()) {
+            final ElectionListener electionListener = new ElectionListener(roleName, listener);
+            final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
+            leaderSelector.autoRequeue();
+            leaderSelector.start();
+
+            final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener);
+
+            leaderRoles.put(roleName, leaderRole);
+        }
+        logger.info("{} Registered new Leader Selector for role {}", this, roleName);
+    }
+
+    @Override
+    public synchronized void unregister(final String roleName) {
+        registeredRoles.remove(roleName);
+
+        final LeaderRole leaderRole = leaderRoles.remove(roleName);
+        final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
+        if (leaderSelector == null) {
+            logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
+            return;
+        }
+
+        leaderSelector.close();
+    }
+
+    @Override
+    public synchronized void stop() {
+        stopped = true;
+
+        for (final LeaderRole role : leaderRoles.values()) {
+            final LeaderSelector selector = role.getLeaderSelector();
+            selector.close();
+        }
+
+        leaderRoles.clear();
+
+        if (curatorClient != null) {
+            curatorClient.close();
+            curatorClient = null;
+        }
+
+        logger.info("{} stopped and closed", this);
+    }
+
+    @Override
+    public boolean isStopped() {
+        return stopped;
+    }
+
+
+    @Override
+    public String toString() {
+        return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
+    }
+
+
+    @Override
+    public synchronized boolean isLeader(final String roleName) {
+        final LeaderRole role = leaderRoles.get(roleName);
+        if (role == null) {
+            return false;
+        }
+
+        return role.isLeader();
+    }
+
+
+    private static class LeaderRole {
+        private final LeaderSelector leaderSelector;
+        private final ElectionListener electionListener;
+
+        public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) {
+            this.leaderSelector = leaderSelector;
+            this.electionListener = electionListener;
+        }
+
+        public LeaderSelector getLeaderSelector() {
+            return leaderSelector;
+        }
+
+        public boolean isLeader() {
+            return electionListener.isLeader();
+        }
+    }
+
+
+    private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
+        private final String roleName;
+        private final LeaderElectionStateChangeListener listener;
+
+        private volatile boolean leader;
+
+        public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
+            this.roleName = roleName;
+            this.listener = listener;
+        }
+
+        public boolean isLeader() {
+            return leader;
+        }
+
+        @Override
+        public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+            logger.info("{} Connection State changed to {}", this, newState.name());
+            super.stateChanged(client, newState);
+        }
+
+        @Override
+        public void takeLeadership(final CuratorFramework client) throws Exception {
+            leader = true;
+            logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
+
+            if (listener != null) {
+                leaderElectionMonitorEngine.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onLeaderElection();
+                    }
+                });
+            }
+
+            // Curator API states that we lose the leadership election when we return from this method,
+            // so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
+            try {
+                while (!isStopped()) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                        logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                }
+            } finally {
+                leader = false;
+                logger.info("{} This node is no longer leader for role '{}'", this, roleName);
+
+                if (listener != null) {
+                    leaderElectionMonitorEngine.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            listener.onLeaderRelinquish();
+                        }
+                    });
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
new file mode 100644
index 0000000..d16dbdb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+public interface LeaderElectionManager {
+    /**
+     * Starts managing leader elections for all registered roles
+     */
+    void start();
+
+    /**
+     * Adds a new role for which a leader is required
+     *
+     * @param roleName the name of the role
+     */
+    void register(String roleName);
+
+    /**
+     * Adds a new role for which a leader is required
+     *
+     * @param roleName the name of the role
+     * @param listener a listener that will be called when the node gains or relinquishes
+     *            the role of leader
+     */
+    void register(String roleName, LeaderElectionStateChangeListener listener);
+
+    /**
+     * Removes the role with the given name from this manager. If this
+     * node is the elected leader for the given role, this node will relinquish
+     * the leadership role
+     *
+     * @param roleName the name of the role to unregister
+     */
+    void unregister(String roleName);
+
+    /**
+     * Returns a boolean value indicating whether or not this node
+     * is the elected leader for the given role
+     *
+     * @param roleName the name of the role
+     * @return <code>true</code> if the node is the elected leader, <code>false</code> otherwise.
+     */
+    boolean isLeader(String roleName);
+
+    /**
+     * @return <code>true</code> if the manager is stopped, false otherwise.
+     */
+    boolean isStopped();
+
+    /**
+     * Stops managing leader elections and relinquishes the role as leader
+     * for all registered roles. If the LeaderElectionManager is later started
+     * again, all previously registered roles will still be registered.
+     */
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
new file mode 100644
index 0000000..79c7a75
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+/**
+ * Callback interface that can be used to listen for state changes so that the node
+ * can be notified when it becomes the Elected Leader for a role or is no longer the
+ * Elected Leader
+ */
+public interface LeaderElectionStateChangeListener {
+    /**
+     * This method is invoked whenever this node is elected leader
+     */
+    void onLeaderElection();
+
+    /**
+     * This method is invoked whenever this node no longer is the elected leader.
+     */
+    void onLeaderRelinquish();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 24d2295..beb71c1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -168,6 +168,12 @@ nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
 nifi.cluster.node.unicast.manager.address=${nifi.cluster.node.unicast.manager.address}
 nifi.cluster.node.unicast.manager.protocol.port=${nifi.cluster.node.unicast.manager.protocol.port}
 
+# zookeeper properties, used for cluster management #
+nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
+nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
+nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
+nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
+
 # cluster manager properties (only configure for cluster manager) #
 nifi.cluster.is.manager=${nifi.cluster.is.manager}
 nifi.cluster.manager.address=${nifi.cluster.manager.address}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b981bde..4fdda06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -760,12 +760,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn);
         } else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
             clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn);
-        } else {
-            // handle primary
-            final Boolean primary = nodeDTO.isPrimary();
-            if (primary != null && primary) {
-                clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn);
-            }
         }
 
         final String nodeId = nodeDTO.getNodeId();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dc764b0..442acdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -765,6 +765,17 @@ language governing permissions and limitations under the License. -->
                 <artifactId>zookeeper</artifactId>
                 <version>3.4.6</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-framework</artifactId>
+                <version>2.10.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-recipes</artifactId>
+                <version>2.10.0</version>
+            </dependency>
+
             
             <!-- Test Dependencies for testing interactions with ZooKeeper -->
             <dependency>
@@ -779,6 +790,8 @@ language governing permissions and limitations under the License. -->
                 <version>6.8.8</version>
                 <scope>test</scope>
             </dependency>
+            
+            
             <dependency>            
                 <groupId>org.jsoup</groupId>
                 <artifactId>jsoup</artifactId>