You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/09/06 20:32:12 UTC

[2/2] nifi git commit: NIFI-1966: When cluster is started up, do not assume that Cluster Coordinator has the golden copy of the flow but instead wait for some period of time or until the required number of nodes have connected, and then choose which flow

NIFI-1966: When cluster is started up, do not assume that Cluster Coordinator has the golden copy of the flow but instead wait for some period of time or until the required number of nodes have connected, and then choose which flow is correct. This closes #977


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7e76cc0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7e76cc0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7e76cc0

Branch: refs/heads/master
Commit: a7e76cc00a13815d899267ce9e4ca1ad6cf51ce1
Parents: 7a45193
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Aug 30 16:59:50 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Sep 6 16:31:37 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |  15 +-
 .../src/main/asciidoc/administration-guide.adoc |  23 +-
 .../coordination/ClusterCoordinator.java        |  10 +
 .../cluster/protocol/ConnectionRequest.java     |   8 +-
 .../cluster/protocol/ConnectionResponse.java    |  10 +-
 .../jaxb/message/AdaptedConnectionRequest.java  |  11 +
 .../jaxb/message/ConnectionRequestAdapter.java  |   3 +-
 .../jaxb/message/ConnectionResponseAdapter.java |   2 +-
 .../message/HeartbeatResponseMessage.java       |  10 +-
 .../cluster/coordination/flow/FlowElection.java |  75 ++++++
 .../flow/PopularVoteFlowElection.java           | 240 +++++++++++++++++++
 .../PopularVoteFlowElectionFactoryBean.java     |  66 +++++
 .../heartbeat/AbstractHeartbeatMonitor.java     |   3 +-
 .../ClusterProtocolHeartbeatMonitor.java        |  10 +-
 .../node/NodeClusterCoordinator.java            | 154 +++++++++---
 .../NodeClusterCoordinatorFactoryBean.java      |   4 +-
 .../resources/nifi-cluster-manager-context.xml  |   4 +
 .../flow/TestPopularVoteFlowElection.java       | 104 ++++++++
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  10 +
 .../node/TestNodeClusterCoordinator.java        |  55 ++++-
 .../nifi/cluster/integration/Cluster.java       |  20 +-
 .../apache/nifi/cluster/integration/Node.java   |  14 +-
 .../src/test/resources/conf/empty-flow.xml      |  27 +++
 .../src/test/resources/conf/non-empty-flow.xml  |  25 ++
 .../apache/nifi/controller/FlowController.java  | 103 ++++----
 .../nifi/controller/StandardFlowService.java    |  42 ++--
 .../cluster/ClusterProtocolHeartbeater.java     |  24 +-
 .../election/CuratorLeaderElectionManager.java  |   7 +-
 .../nifi/fingerprint/FingerprintFactory.java    |  36 ++-
 .../nifi-framework/nifi-resources/pom.xml       |   2 +
 .../src/main/resources/conf/nifi.properties     |   2 +
 .../nifi/web/api/ApplicationResource.java       | 101 +++++---
 .../src/main/resources/nifi-web-api-context.xml |  23 ++
 .../src/main/webapp/js/nf/nf-common.js          |   2 +
 34 files changed, 1052 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 619c104..3fab706 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -169,6 +169,8 @@ public abstract class NiFiProperties {
     public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
     public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
     public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
+    public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time";
+    public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates";
 
     // zookeeper properties
     public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
@@ -239,6 +241,7 @@ public abstract class NiFiProperties {
     // cluster node defaults
     public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
     public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
+    public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
 
     // state management defaults
     public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml";
@@ -309,12 +312,12 @@ public abstract class NiFiProperties {
 
     public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
         final String value = getProperty(propertyName);
-        if (value == null) {
+        if (value == null || value.trim().isEmpty()) {
             return defaultValue;
         }
 
         try {
-            return Integer.parseInt(getProperty(propertyName));
+            return Integer.parseInt(value.trim());
         } catch (final Exception e) {
             return defaultValue;
         }
@@ -935,6 +938,14 @@ public abstract class NiFiProperties {
         return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR);
     }
 
+    public String getFlowElectionMaxWaitTime() {
+        return getProperty(FLOW_ELECTION_MAX_WAIT_TIME, DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
+    }
+
+    public Integer getFlowElectionMaxCandidates() {
+        return getIntegerProperty(FLOW_ELECTION_MAX_CANDIDATES, null);
+    }
+
     public String getFlowConfigurationArchiveMaxTime() {
         return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index eb0599b..5f0301e 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1171,6 +1171,20 @@ There are cases where a DFM may wish to continue making changes to the flow, eve
 In this case, they DFM may elect to remove the node from the cluster entirely through the Cluster Management dialog. Once removed,
 the node cannot be rejoined to the cluster until it has been restarted.
 
+*Flow Election* +
+When a cluster first starts up, NiFi must determine which of the nodes have the
+"correct" version of the flow. This is done by voting on the flows that each of the nodes has. When a node
+attempts to connect to a cluster, it provides a copy of its local flow to the Cluster Coordinator. If no flow
+has yet been elected the "correct" flow, the node's flow is compared to each of the other Nodes' flows. If another
+Node's flow matches this one, a vote is cast for this flow. If no other Node has reported the same flow yet, this
+flow will be added to the pool of possibly elected flows with one vote. After
+some amount of time has elapsed (configured by setting the `nifi.cluster.flow.election.max.wait.time` property) or
+some number of Nodes have cast votes (configured by setting the `nifi.cluster.flow.election.max.candidates` property),
+a flow is elected to be the "correct" copy of the flow. All nodes that have incompatible flows are then disconnected
+from the cluster while those with compatible flows inherit the cluster's flow. Election is performed according to
+the "popular vote" with the caveat that the winner will never be an "empty flow" unless all flows are empty. This
+allows an administrator to remove a node's `flow.xml.gz` file and restart the node, knowing that the node's flow will
+not be voted to be the "correct" flow unless no other flow is found.
 
 *Basic Cluster Setup* +
 
@@ -1204,8 +1218,13 @@ For each Node, the minimum properties to configure are as follows:
    that should be used for storing data. The default value is _/root_. This is important to set correctly, as which cluster
    the NiFi instance attempts to join is determined by which ZooKeeper instance it connects to and the ZooKeeper Root Node
    that is specified.
-** nifi.cluster.request.replication.claim.timeout - Specifies how long a component can be 'locked' during a request replication
-   before the lock expires and is automatically unlocked. See <<claim_management>> for more information.
+** nifi.cluster.flow.election.max.wait.time - Specifies the amount of time to wait before electing a Flow as the "correct" Flow.
+   If the number of Nodes that have voted is equal to the number specified by the `nifi.cluster.flow.election.max.candidates`
+   property, the cluster will not wait this long. The default is 5 minutes. Note that the time starts as soon as the first vote
+   is cast.
+** nifi.cluster.flow.election.max.candidates - Specifies the number of Nodes required in the cluster to cause early election
+   of Flows. This allows the Nodes in the cluster to avoid having to wait a long time before starting processing if we reach
+   at least this number of nodes in the cluster.
 
 Now, it is possible to start up the cluster. It does not matter which order the instances start up. Navigate to the URL for
 one of the nodes, and the User Interface should look similar to the following:

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 49c6142..723a374 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -226,4 +226,14 @@ public interface ClusterCoordinator {
      * @return <code>true</code> if connected, <code>false</code> otherwise
      */
     boolean isConnected();
+
+    /**
+     * @return <code>true</code> if Flow Election is complete, <code>false</code> otherwise
+     */
+    boolean isFlowElectionComplete();
+
+    /**
+     * @return the current status of Flow Election.
+     */
+    String getFlowElectionStatus();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
index 0e27155..2e87402 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
@@ -28,16 +28,22 @@ import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter;
 public class ConnectionRequest {
 
     private final NodeIdentifier proposedNodeIdentifier;
+    private final DataFlow dataFlow;
 
-    public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) {
+    public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier, final DataFlow dataFlow) {
         if (proposedNodeIdentifier == null) {
             throw new IllegalArgumentException("Proposed node identifier may not be null.");
         }
+
         this.proposedNodeIdentifier = proposedNodeIdentifier;
+        this.dataFlow = dataFlow;
     }
 
     public NodeIdentifier getProposedNodeIdentifier() {
         return proposedNodeIdentifier;
     }
 
+    public DataFlow getDataFlow() {
+        return dataFlow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 4e572af..c0717a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -49,9 +49,8 @@ public class ConnectionResponse {
 
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
-        } else if (dataFlow == null) {
-            throw new IllegalArgumentException("DataFlow may not be null.");
         }
+
         this.nodeIdentifier = nodeIdentifier;
         this.dataFlow = dataFlow;
         this.tryLaterSeconds = 0;
@@ -61,14 +60,14 @@ public class ConnectionResponse {
         this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
     }
 
-    public ConnectionResponse(final int tryLaterSeconds) {
+    public ConnectionResponse(final int tryLaterSeconds, final String explanation) {
         if (tryLaterSeconds <= 0) {
-            throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds);
+            throw new IllegalArgumentException("Try-Later seconds must be nonnegative: " + tryLaterSeconds);
         }
         this.dataFlow = null;
         this.nodeIdentifier = null;
         this.tryLaterSeconds = tryLaterSeconds;
-        this.rejectionReason = null;
+        this.rejectionReason = explanation;
         this.instanceId = null;
         this.nodeStatuses = null;
         this.componentRevisions = null;
@@ -120,7 +119,6 @@ public class ConnectionResponse {
         return instanceId;
     }
 
-
     public List<NodeConnectionStatus> getNodeConnectionStatuses() {
         return nodeStatuses;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
index 85f4a3f..06ce14a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -24,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 public class AdaptedConnectionRequest {
 
     private NodeIdentifier nodeIdentifier;
+    private DataFlow dataFlow;
 
     public AdaptedConnectionRequest() {
     }
@@ -37,4 +40,12 @@ public class AdaptedConnectionRequest {
         this.nodeIdentifier = nodeIdentifier;
     }
 
+    @XmlJavaTypeAdapter(DataFlowAdapter.class)
+    public DataFlow getDataFlow() {
+        return dataFlow;
+    }
+
+    public void setDataFlow(final DataFlow dataFlow) {
+        this.dataFlow = dataFlow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
index 21f9770..0faea39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
@@ -28,13 +28,14 @@ public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionReques
         final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
         if (cr != null) {
             aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
+            aCr.setDataFlow(cr.getDataFlow());
         }
         return aCr;
     }
 
     @Override
     public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
-        return new ConnectionRequest(aCr.getNodeIdentifier());
+        return new ConnectionRequest(aCr.getNodeIdentifier(), aCr.getDataFlow());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index 470843e..4b4c0fc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -41,7 +41,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
     @Override
     public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
         if (aCr.shouldTryLater()) {
-            return new ConnectionResponse(aCr.getTryLaterSeconds());
+            return new ConnectionResponse(aCr.getTryLaterSeconds(), aCr.getRejectionReason());
         } else if (aCr.getRejectionReason() != null) {
             return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
index cbb8b48..e508185 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
@@ -28,7 +28,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 public class HeartbeatResponseMessage extends ProtocolMessage {
 
     private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
-
+    private String flowElectionMessage = null;
 
     @Override
     public MessageType getType() {
@@ -42,4 +42,12 @@ public class HeartbeatResponseMessage extends ProtocolMessage {
     public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> nodeStatuses) {
         this.updatedNodeStatuses = new ArrayList<>(nodeStatuses);
     }
+
+    public String getFlowElectionMessage() {
+        return flowElectionMessage;
+    }
+
+    public void setFlowElectionMessage(String flowElectionMessage) {
+        this.flowElectionMessage = flowElectionMessage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
new file mode 100644
index 0000000..70e72c5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * <p>
+ * A FlowElection is responsible for examining multiple versions of a dataflow and determining which of
+ * the versions is the "correct" version of the flow.
+ * </p>
+ */
+public interface FlowElection {
+
+    /**
+     * Checks if the election has completed or not.
+     *
+     * @return <code>true</code> if the election has completed, <code>false</code> otherwise.
+     */
+    boolean isElectionComplete();
+
+    /**
+     * Returns <code>true</code> if a vote has already been counted for the given Node Identifier, <code>false</code> otherwise.
+     *
+     * @param nodeIdentifier the identifier of the node
+     * @return <code>true</code> if a vote has already been counted for the given Node Identifier, <code>false</code> otherwise.
+     */
+    boolean isVoteCounted(NodeIdentifier nodeIdentifier);
+
+    /**
+     * If the election has not yet completed, adds the given DataFlow to the list of candidates
+     * (if it is not already in the running) and increments the number of votes for this DataFlow by 1.
+     * If the election has completed, the given candidate is ignored, and the already-elected DataFlow
+     * will be returned. If the election has not yet completed, a vote will be cast for the given
+     * candidate and <code>null</code> will be returned, signifying that no candidate has yet been chosen.
+     *
+     * @param candidate the DataFlow to vote for and add to the pool of candidates if not already present
+     * @param nodeIdentifier the identifier of the node casting the vote
+     *
+     * @return the elected {@link DataFlow}, or <code>null</code> if no DataFlow has yet been elected
+     */
+    DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier);
+
+    /**
+     * Returns the DataFlow that has been elected as the "correct" version of the flow, or <code>null</code>
+     * if the election has not yet completed.
+     *
+     * @return the DataFlow that has been elected as the "correct" version of the flow, or <code>null</code>
+     *         if the election has not yet completed.
+     */
+    DataFlow getElectedDataFlow();
+
+    /**
+     * Returns a human-readable description of the status of the election
+     *
+     * @return a human-readable description of the status of the election
+     */
+    String getStatusDescription();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
new file mode 100644
index 0000000..bc730d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import static java.util.Objects.requireNonNull;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.StandardFlowSynchronizer;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * An implementation of {@link FlowElection} that waits until either a maximum amount of time has elapsed
+ * or a maximum number of Data Flows have entered the running to be elected, and then elects the 'winner'
+ * based on the number of 'votes' that a particular DataFlow has received. This implementation considers
+ * two Flows with the same fingerprint to be the same Flow. If there is a tie in the number of votes for
+ * a particular DataFlow, one will be chosen in a non-deterministic manner. If multiple DataFlows are
+ * presented with the same fingerprint but different Flows (for instance, the position of a component has
+ * changed), one of the Flows with that fingerprint will be chosen in a non-deterministic manner.
+ * </p>
+ */
+public class PopularVoteFlowElection implements FlowElection {
+    private static final Logger logger = LoggerFactory.getLogger(PopularVoteFlowElection.class);
+
+    private final long maxWaitNanos;
+    private final Integer maxNodes;
+    private final FingerprintFactory fingerprintFactory;
+
+    private volatile Long startNanos = null;
+    private volatile DataFlow electedDataFlow = null;
+
+    private final Map<String, FlowCandidate> candidateByFingerprint = new HashMap<>();
+
+    public PopularVoteFlowElection(final long maxWait, final TimeUnit maxWaitPeriod, final Integer maxNodes, final FingerprintFactory fingerprintFactory) {
+        this.maxWaitNanos = maxWaitPeriod.toNanos(maxWait);
+        if (maxWaitNanos < 1) {
+            throw new IllegalArgumentException("Maximum wait time to elect Cluster Flow cannot be less than 1 nanosecond");
+        }
+
+        this.maxNodes = maxNodes;
+        if (maxNodes != null && maxNodes < 1) {
+            throw new IllegalArgumentException("Maximum number of nodes to wait on before electing Cluster Flow cannot be less than 1");
+        }
+
+        this.fingerprintFactory = requireNonNull(fingerprintFactory);
+    }
+
+    @Override
+    public synchronized boolean isElectionComplete() {
+        if (electedDataFlow != null) {
+            return true;
+        }
+
+        if (startNanos == null) {
+            return false;
+        }
+
+        final long nanosSinceStart = System.nanoTime() - startNanos;
+        if (nanosSinceStart > maxWaitNanos) {
+            final FlowCandidate elected = performElection();
+            logger.info("Election is complete because the maximum allowed time has elapsed. "
+                + "The elected dataflow is held by the following nodes: {}", elected.getNodes());
+
+            return true;
+        } else if (maxNodes != null) {
+            final int numVotes = getVoteCount();
+            if (numVotes >= maxNodes) {
+                final FlowCandidate elected = performElection();
+                logger.info("Election is complete because the required number of nodes ({}) have voted. "
+                    + "The elected dataflow is held by the following nodes: {}", maxNodes, elected.getNodes());
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isVoteCounted(final NodeIdentifier nodeIdentifier) {
+        return candidateByFingerprint.values().stream()
+            .anyMatch(candidate -> candidate.getNodes().contains(nodeIdentifier));
+    }
+
+    private synchronized int getVoteCount() {
+        return candidateByFingerprint.values().stream().mapToInt(candidate -> candidate.getVotes()).sum();
+    }
+
+    @Override
+    public synchronized DataFlow castVote(final DataFlow candidate, final NodeIdentifier nodeId) {
+        if (candidate == null || isElectionComplete()) {
+            return getElectedDataFlow();
+        }
+
+        final String fingerprint = fingerprint(candidate);
+        final FlowCandidate flowCandidate = candidateByFingerprint.computeIfAbsent(fingerprint, key -> new FlowCandidate(candidate));
+        final boolean voteCast = flowCandidate.vote(nodeId);
+
+        if (startNanos == null) {
+            startNanos = System.nanoTime();
+        }
+
+        if (voteCast) {
+            logger.info("Vote cast by {}; this flow now has {} votes", nodeId, flowCandidate.getVotes());
+        }
+
+        if (isElectionComplete()) {
+            return getElectedDataFlow();
+        }
+
+        return null; // no elected candidate so return null
+    }
+
+    private String fingerprint(final DataFlow dataFlow) {
+        final String flowFingerprint = fingerprintFactory.createFingerprint(dataFlow.getFlow());
+        final String authFingerprint = dataFlow.getAuthorizerFingerprint() == null ? "" : new String(dataFlow.getAuthorizerFingerprint(), StandardCharsets.UTF_8);
+        final String candidateFingerprint = flowFingerprint + authFingerprint;
+
+        return candidateFingerprint;
+    }
+
+    @Override
+    public DataFlow getElectedDataFlow() {
+        return electedDataFlow;
+    }
+
+    private FlowCandidate performElection() {
+        if (candidateByFingerprint.isEmpty()) {
+            return null;
+        }
+
+        final FlowCandidate elected;
+        if (candidateByFingerprint.size() == 1) {
+            elected = candidateByFingerprint.values().iterator().next();
+        } else {
+            elected = candidateByFingerprint.values().stream()
+                .filter(candidate -> !candidate.isFlowEmpty())  // We have more than 1 fingerprint. Do not consider empty flows.
+                .max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes()))
+                .get();
+        }
+
+        this.electedDataFlow = elected.getDataFlow();
+        return elected;
+    }
+
+    @Override
+    public synchronized String getStatusDescription() {
+        if (startNanos == null) {
+            return "No votes have yet been cast.";
+        }
+
+        final StringBuilder descriptionBuilder = new StringBuilder("Election will complete in ");
+        final long nanosElapsed = System.nanoTime() - startNanos;
+        final long nanosLeft = maxWaitNanos - nanosElapsed;
+        final long secsLeft = TimeUnit.NANOSECONDS.toSeconds(nanosLeft);
+        if (secsLeft < 1) {
+            descriptionBuilder.append("less than 1 second");
+        } else {
+            descriptionBuilder.append(secsLeft).append(" seconds");
+        }
+
+        if (maxNodes != null) {
+            final int votesNeeded = maxNodes.intValue() - getVoteCount();
+            descriptionBuilder.append(" or after ").append(votesNeeded).append(" more vote");
+            descriptionBuilder.append(votesNeeded == 1 ? " is " : "s are ");
+            descriptionBuilder.append("cast, whichever occurs first.");
+        }
+
+        return descriptionBuilder.toString();
+    }
+
+    private static class FlowCandidate {
+        private final DataFlow dataFlow;
+        private final AtomicInteger voteCount = new AtomicInteger(0);
+        private final Set<NodeIdentifier> nodeIds = Collections.synchronizedSet(new HashSet<>());
+
+        public FlowCandidate(final DataFlow dataFlow) {
+            this.dataFlow = dataFlow;
+        }
+
+        /**
+         * Casts a vote for this candidate for the given node identifier, if a vote has not already
+         * been cast for this node identifier
+         *
+         * @param nodeId the node id that is casting the vote
+         * @return <code>true</code> if the vote was case, <code>false</code> if this node id has already cast its vote
+         */
+        public boolean vote(final NodeIdentifier nodeId) {
+            if (nodeIds.add(nodeId)) {
+                voteCount.incrementAndGet();
+                return true;
+            }
+
+            return false;
+        }
+
+        public int getVotes() {
+            return voteCount.get();
+        }
+
+        public DataFlow getDataFlow() {
+            return dataFlow;
+        }
+
+        public boolean isFlowEmpty() {
+            return StandardFlowSynchronizer.isEmpty(dataFlow);
+        }
+
+        public Set<NodeIdentifier> getNodes() {
+            return nodeIds;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
new file mode 100644
index 0000000..09d3845
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.FactoryBean;
+
+public class PopularVoteFlowElectionFactoryBean implements FactoryBean<PopularVoteFlowElection> {
+    private static final Logger logger = LoggerFactory.getLogger(PopularVoteFlowElectionFactoryBean.class);
+    private NiFiProperties properties;
+
+    @Override
+    public PopularVoteFlowElection getObject() throws Exception {
+        final String maxWaitTime = properties.getFlowElectionMaxWaitTime();
+        long maxWaitMillis;
+        try {
+            maxWaitMillis = FormatUtils.getTimeDuration(maxWaitTime, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            logger.warn("Failed to parse value of property '{}' as a valid time period. Value was '{}'. Ignoring this value and using the default value of '{}'",
+                NiFiProperties.FLOW_ELECTION_MAX_WAIT_TIME, maxWaitTime, NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
+            maxWaitMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
+        }
+
+        final Integer maxNodes = properties.getFlowElectionMaxCandidates();
+
+        final StringEncryptor encryptor = StringEncryptor.createEncryptor(properties);
+        final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor);
+        return new PopularVoteFlowElection(maxWaitMillis, TimeUnit.MILLISECONDS, maxNodes, fingerprintFactory);
+    }
+
+    @Override
+    public Class<?> getObjectType() {
+        return PopularVoteFlowElection.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void setProperties(final NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 8b0dc97..5119dac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -121,7 +121,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
      * Visible for testing.
      */
     protected synchronized void monitorHeartbeats() {
-        if (!clusterCoordinator.isActiveClusterCoordinator()) {
+        final NodeIdentifier activeCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
+        if (activeCoordinator != null && !activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
             // Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
             // on a very large delay. So before we kick the node out of the cluster, we want to first check what the
             // ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 716610c..9f620d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -153,11 +153,19 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
 
         // Formulate a List of differences between our view of the cluster topology and the node's view
         // and send that back to the node so that it is in-sync with us
-        final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
+        List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
+        if (nodeStatusList == null) {
+            nodeStatusList = Collections.emptyList();
+        }
         final List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(nodeStatusList);
 
         final HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage();
         responseMessage.setUpdatedNodeStatuses(updatedStatuses);
+
+        if (!getClusterCoordinator().isFlowElectionComplete()) {
+            responseMessage.setFlowElectionMessage(getClusterCoordinator().getFlowElectionStatus());
+        }
+
         return responseMessage;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index e50d8fa..a6a6009 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
@@ -86,16 +87,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final NiFiProperties nifiProperties;
     private final LeaderElectionManager leaderElectionManager;
     private final AtomicLong latestUpdateId = new AtomicLong(-1);
+    private final FlowElection flowElection;
 
     private volatile FlowService flowService;
     private volatile boolean connected;
     private volatile boolean closed = false;
+    private volatile boolean requireElection = true;
 
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
 
     public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
-            final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
+            final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
@@ -103,6 +106,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.revisionManager = revisionManager;
         this.nifiProperties = nifiProperties;
         this.leaderElectionManager = leaderElectionManager;
+        this.flowElection = flowElection;
 
         senderListener.addHandler(this);
     }
@@ -115,9 +119,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
         closed = true;
 
-        final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
-        updateNodeStatus(shutdownStatus, false);
-        logger.info("Successfully notified other nodes that I am shutting down");
+        final NodeIdentifier localId = getLocalNodeIdentifier();
+        if (localId != null) {
+            final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(localId, DisconnectionCode.NODE_SHUTDOWN);
+            updateNodeStatus(shutdownStatus, false);
+            logger.info("Successfully notified other nodes that I am shutting down");
+        }
     }
 
     @Override
@@ -230,6 +237,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     @Override
     public void requestNodeConnect(final NodeIdentifier nodeId, final String userDn) {
+        if (requireElection && !flowElection.isElectionComplete() && flowElection.isVoteCounted(nodeId)) {
+            // If we receive a heartbeat from a node that we already know, we don't want to request that it reconnect
+            // to the cluster because no flow has yet been elected. However, if the node has not yet voted, we want to send
+            // a reconnect request because we want this node to cast its vote for the flow, and this happens on connection
+            logger.debug("Received heartbeat for {} and node is not connected. Will not request node connect to cluster, "
+                + "though, because the Flow Election is still in progress", nodeId);
+            return;
+        }
+
         if (userDn == null) {
             reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster");
         } else {
@@ -243,7 +259,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         request.setNodeId(nodeId);
         request.setInstanceId(instanceId);
 
-        requestReconnectionAsynchronously(request, 10, 5);
+        // If we still are requiring that an election take place, we do not want to include our local dataflow, because we don't
+        // yet know what the cluster's dataflow looks like. However, if we don't require election, then we've connected to the
+        // cluster, which means that our flow is correct.
+        final boolean includeDataFlow = !requireElection;
+        requestReconnectionAsynchronously(request, 10, 5, includeDataFlow);
     }
 
     @Override
@@ -652,7 +672,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         disconnectThread.start();
     }
 
-    private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds) {
+    private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds, final boolean includeDataFlow) {
         final Thread reconnectionThread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -675,7 +695,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                             return;
                         }
 
-                        request.setDataFlow(new StandardDataFlow(flowService.createDataFlow()));
+                        if (includeDataFlow) {
+                            request.setDataFlow(new StandardDataFlow(flowService.createDataFlow()));
+                        }
+
                         request.setNodeConnectionStatuses(getConnectionStatuses());
                         request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
 
@@ -726,9 +749,13 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
-        final NodeConnectionStatus connectionStatus = nodeStatuses.get(getLocalNodeIdentifier());
         final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
-        msg.setNodeConnectionStatus(connectionStatus);
+        final NodeIdentifier self = getLocalNodeIdentifier();
+        if (self != null) {
+            final NodeConnectionStatus connectionStatus = nodeStatuses.get(self);
+            msg.setNodeConnectionStatus(connectionStatus);
+        }
+
         return msg;
     }
 
@@ -781,6 +808,20 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         }
     }
 
+    @Override
+    public String getFlowElectionStatus() {
+        if (!requireElection) {
+            return null;
+        }
+
+        return flowElection.getStatusDescription();
+    }
+
+    @Override
+    public boolean isFlowElectionComplete() {
+        return !requireElection || flowElection.isElectionComplete();
+    }
+
     private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) {
         final NodeConnectionStatus proposedConnectionStatus = new NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
         final NodeConnectionStatus existingStatus = nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
@@ -808,59 +849,86 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
         final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
-        final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()));
+        final NodeIdentifier withRequestorDn = addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN());
+        final DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow();
+        final ConnectionRequest requestWithDn = new ConnectionRequest(withRequestorDn, dataFlow);
 
         // Resolve Node identifier.
         final NodeIdentifier resolvedNodeId = resolveNodeId(proposedIdentifier);
-        final ConnectionResponse response = createConnectionResponse(requestWithDn, resolvedNodeId);
+
+        if (requireElection) {
+            final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withRequestorDn);
+            if (electedDataFlow == null) {
+                logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withRequestorDn);
+                return createFlowElectionInProgressResponse();
+            } else {
+                logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withRequestorDn);
+                return createConnectionResponse(requestWithDn, resolvedNodeId, electedDataFlow);
+            }
+        }
+
+        logger.info("Received Connection Request from {}; responding with my DataFlow", withRequestorDn);
+        return createConnectionResponse(requestWithDn, resolvedNodeId);
+    }
+
+    private ConnectionResponseMessage createFlowElectionInProgressResponse() {
         final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
-        responseMessage.setConnectionResponse(response);
+        final String statusDescription = flowElection.getStatusDescription();
+        responseMessage.setConnectionResponse(new ConnectionResponse(5, "Cluster is still voting on which Flow is the correct flow for the cluster. " + statusDescription));
         return responseMessage;
     }
 
-    private ConnectionResponse createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
+    private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
+        DataFlow dataFlow = null;
+        if (flowService != null) {
+            try {
+                dataFlow = flowService.createDataFlow();
+            } catch (final IOException ioe) {
+                logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to "
+                    + resolvedNodeIdentifier + ". Will tell node to try again later", ioe);
+            }
+        }
+
+        return createConnectionResponse(request, resolvedNodeIdentifier, dataFlow);
+    }
+
+
+    private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final DataFlow clusterDataFlow) {
         if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
             // if the socket address is not listed in the firewall, then return a null response
             logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
-            return ConnectionResponse.createBlockedByFirewallResponse();
+            final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
+            final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
+            responseMessage.setConnectionResponse(response);
+            return responseMessage;
+        }
+
+        if (clusterDataFlow == null) {
+            final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
+            responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available"));
+            return responseMessage;
         }
 
         // Set node's status to 'CONNECTING'
         NodeConnectionStatus status = getConnectionStatus(resolvedNodeIdentifier);
         if (status == null) {
-            addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node.  Setting status to connecting.");
+            addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node. Setting status to connecting.");
         } else {
-            addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
+            addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting.");
         }
 
         status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
         updateNodeStatus(status);
 
-        DataFlow dataFlow = null;
-        if (flowService != null) {
-            try {
-                dataFlow = flowService.createDataFlow();
-            } catch (final IOException ioe) {
-                logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to "
-                        + resolvedNodeIdentifier + ". Will tell node to try again later", ioe);
-            }
-        }
-
-        if (dataFlow == null) {
-            // Create try-later response based on flow retrieval delay to give
-            // the flow management service a chance to retrieve a current flow
-            final int tryAgainSeconds = 5;
-            addNodeEvent(resolvedNodeIdentifier, Severity.WARNING, "Connection requested from node, but manager was unable to obtain current flow. "
-                    + "Instructing node to try again in " + tryAgainSeconds + " seconds.");
-
-            // return try later response
-            return new ConnectionResponse(tryAgainSeconds);
-        }
-
-        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, getConnectionStatuses(),
+        final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(),
                 revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
+
+        final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
+        responseMessage.setConnectionResponse(response);
+        return responseMessage;
     }
 
+
     private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
                 nodeId.getSocketAddress(), nodeId.getSocketPort(),
@@ -959,6 +1027,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     @Override
     public void setConnected(final boolean connected) {
         this.connected = connected;
+
+        // Once we have connected to the cluster, election is no longer required.
+        // It is required only upon startup so that if multiple nodes are started up
+        // at the same time, and they have different flows, that we don't choose the
+        // wrong flow as the 'golden copy' by electing that node as the elected
+        // active Cluster Coordinator.
+        if (connected) {
+            logger.info("This node is now connected to the cluster. Will no longer require election of DataFlow.");
+            requireElection = false;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index fc52ee1..2845a01 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.spring;
 
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@@ -44,8 +45,9 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
             final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
             final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
             final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
+            final FlowElection flowElection = applicationContext.getBean("flowElection", FlowElection.class);
 
-            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager, properties);
+            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties);
         }
 
         return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 84c9deb..d261590 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -41,6 +41,10 @@
         <property name="properties" ref="nifiProperties" />
     </bean>
 
+    <bean id="flowElection" class="org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElectionFactoryBean">
+        <property name="properties" ref="nifiProperties" />
+    </bean>
+
     <!-- Cluster Coordinator -->
     <bean id="clusterCoordinator" class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
         <property name="properties" ref="nifiProperties"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
new file mode 100644
index 0000000..c01371db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPopularVoteFlowElection {
+
+    @Test
+    public void testOnlyEmptyFlows() throws IOException {
+        final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class);
+        Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
+
+        final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory);
+        final byte[] flow = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+        assertNull(election.castVote(createDataFlow(flow), createNodeId(1)));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+        assertNull(election.castVote(createDataFlow(flow), createNodeId(2)));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+
+        final DataFlow electedDataFlow = election.castVote(createDataFlow(flow), createNodeId(3));
+        assertNotNull(electedDataFlow);
+
+        assertEquals(new String(flow), new String(electedDataFlow.getFlow()));
+    }
+
+
+    @Test
+    public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException {
+        final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class);
+        Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
+
+        final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 8, fingerprintFactory);
+        final byte[] emptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
+        final byte[] nonEmptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/non-empty-flow.xml"));
+
+        for (int i = 0; i < 8; i++) {
+            assertFalse(election.isElectionComplete());
+            assertNull(election.getElectedDataFlow());
+
+            final DataFlow dataFlow;
+            if (i % 4 == 0) {
+                dataFlow = createDataFlow(nonEmptyFlow);
+            } else {
+                dataFlow = createDataFlow(emptyFlow);
+            }
+
+            final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i));
+            if (i == 7) {
+                assertNotNull(electedDataFlow);
+                assertEquals(new String(nonEmptyFlow), new String(electedDataFlow.getFlow()));
+            } else {
+                assertNull(electedDataFlow);
+            }
+        }
+    }
+
+
+    private NodeIdentifier createNodeId(final int index) {
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
+    }
+
+    private DataFlow createDataFlow(final byte[] flow) {
+        return new StandardDataFlow(flow, new byte[0], new byte[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 634ad41..e83999a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -319,6 +319,16 @@ public class TestAbstractHeartbeatMonitor {
         public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) {
             return false;
         }
+
+        @Override
+        public boolean isFlowElectionComplete() {
+            return true;
+        }
+
+        @Override
+        public String getFlowElectionStatus() {
+            return null;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index e55605b..be9b862 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -32,9 +32,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@@ -77,7 +80,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
+        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
@@ -132,17 +135,19 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
         };
 
         final NodeIdentifier requestedNodeId = createNodeId(6);
-        final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
+        final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
         final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
 
+        coordinator.setConnected(true);
+
         final ProtocolMessage protocolResponse = coordinator.handle(requestMsg);
         assertNotNull(protocolResponse);
         assertTrue(protocolResponse instanceof ConnectionResponseMessage);
@@ -170,7 +175,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -180,6 +185,7 @@ public class TestNodeClusterCoordinator {
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
         Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
+        coordinator.setConnected(true);
 
         final NodeIdentifier nodeId = createNodeId(1);
         coordinator.finishNodeConnection(nodeId);
@@ -400,7 +406,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);
         final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
 
-        final ConnectionRequest connectionRequest = new ConnectionRequest(id1);
+        final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
         final ConnectionRequestMessage crm = new ConnectionRequestMessage();
         crm.setConnectionRequest(connectionRequest);
 
@@ -411,7 +417,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier();
         assertEquals(id1, resolvedNodeId);
 
-        final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId);
+        final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
         final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
         crm2.setConnectionRequest(conRequest2);
 
@@ -434,10 +440,45 @@ public class TestNodeClusterCoordinator {
     }
 
     private ProtocolMessage requestConnection(final NodeIdentifier requestedNodeId, final NodeClusterCoordinator coordinator) {
-        final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
+        final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
         final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
         return coordinator.handle(requestMsg);
     }
 
+
+    private static class FirstVoteWinsFlowElection implements FlowElection {
+        private DataFlow dataFlow;
+        private NodeIdentifier voter;
+
+        @Override
+        public boolean isElectionComplete() {
+            return dataFlow != null;
+        }
+
+        @Override
+        public synchronized DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier) {
+            if (dataFlow == null) {
+                dataFlow = candidate;
+                voter = nodeIdentifier;
+            }
+
+            return dataFlow;
+        }
+
+        @Override
+        public DataFlow getElectedDataFlow() {
+            return dataFlow;
+        }
+
+        @Override
+        public String getStatusDescription() {
+            return "First Vote Wins";
+        }
+
+        @Override
+        public boolean isVoteCounted(NodeIdentifier nodeIdentifier) {
+            return voter != null && voter.equals(nodeIdentifier);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index ce2017e..63c3f60 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -30,7 +30,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
+import org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElection;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.fingerprint.FingerprintFactory;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +45,22 @@ public class Cluster {
     private final Set<Node> nodes = new HashSet<>();
     private final TestingServer zookeeperServer;
 
+    private final long flowElectionTimeoutMillis;
+    private final Integer flowElectionMaxNodes;
+
     public Cluster() throws IOException {
+        this(3, TimeUnit.SECONDS, 3);
+    }
+
+    public Cluster(final long flowElectionTimeout, final TimeUnit flowElectionTimeUnit, final Integer flowElectionMaxNodes) throws IOException {
         try {
             zookeeperServer = new TestingServer();
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
+
+        this.flowElectionTimeoutMillis = flowElectionTimeUnit.toMillis(flowElectionTimeout);
+        this.flowElectionMaxNodes = flowElectionMaxNodes;
     }
 
 
@@ -116,7 +130,11 @@ public class Cluster {
         addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
 
         final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps);
-        final Node node = new Node(nifiProperties);
+
+        final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(nifiProperties));
+        final FlowElection flowElection = new PopularVoteFlowElection(flowElectionTimeoutMillis, TimeUnit.MILLISECONDS, flowElectionMaxNodes, fingerprintFactory);
+
+        final Node node = new Node(nifiProperties, flowElection);
         node.start();
         nodes.add(node);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 7ba718b..93c9397 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -28,9 +28,11 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.cluster.ReportedEvent;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender;
@@ -74,6 +76,7 @@ public class Node {
 
     private final List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
     private final RevisionManager revisionManager;
+    private final FlowElection flowElection;
 
     private NodeClusterCoordinator clusterCoordinator;
     private NodeProtocolSender protocolSender;
@@ -88,11 +91,11 @@ public class Node {
     private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true);
 
 
-    public Node(final NiFiProperties properties) {
-        this(createNodeId(), properties);
+    public Node(final NiFiProperties properties, final FlowElection flowElection) {
+        this(createNodeId(), properties, flowElection);
     }
 
-    public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
+    public Node(final NodeIdentifier nodeId, final NiFiProperties properties, final FlowElection flowElection) {
         this.nodeId = nodeId;
         this.nodeProperties = new NiFiProperties() {
             @Override
@@ -119,6 +122,7 @@ public class Node {
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
         electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
+        this.flowElection = flowElection;
     }
 
 
@@ -132,7 +136,7 @@ public class Node {
         protocolSender = createNodeProtocolSender();
         clusterCoordinator = createClusterCoordinator();
         clusterCoordinator.setLocalNodeIdentifier(nodeId);
-        clusterCoordinator.setConnected(true);
+        //        clusterCoordinator.setConnected(true);
 
         final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
         flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
@@ -273,7 +277,7 @@ public class Node {
         }
 
         final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
-        return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager, nodeProperties);
+        return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
new file mode 100644
index 0000000..c0cb6de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController encoding-version="1.0">
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>00000000-0000-0000-0000-000000000000</id>
+    <name>Empty NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>
\ No newline at end of file