You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2021/02/05 20:20:11 UTC

[nifi] 02/02: NIFI-8204, NIFI-7866: Send revision update count in heartbeats. If update count in heartbeat is greater than that of cluster coordinator, request that node reconnect to get most up-to-date revisions. Cannot check exact equality, as the values may change between the time a heartbeat is created and the time the cluster coordinator receives it. However, it should be safe to assume that the revision won't be greater than that of the cluster coordinator. There is a tiny window in which it could [...]

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 749d05840ba88efc8b42f5434d9223104edfab68
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 5 10:53:53 2021 -0500

    NIFI-8204, NIFI-7866: Send revision update count in heartbeats. If update count in heartbeat is greater than that of cluster coordinator, request that node reconnect to get most up-to-date revisions. Cannot check exact equality, as the values may change between the time a heartbeat is created and the time the cluster coordinator receives it. However, it should be safe to assume that the revision won't be greater than that of the cluster coordinator. There is a tiny window in which it  [...]
    
    This closes #4806.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../cluster/coordination/ClusterCoordinator.java   |  7 +++
 .../coordination/heartbeat/NodeHeartbeat.java      |  5 ++
 .../protocol/ComponentRevisionSnapshot.java        | 66 +++++++++++++++++++++
 .../nifi/cluster/protocol/ConnectionResponse.java  | 14 ++---
 .../nifi/cluster/protocol/HeartbeatPayload.java    | 24 +++++---
 .../jaxb/message/AdaptedConnectionResponse.java    | 15 +++--
 .../message/ReconnectionRequestMessage.java        | 17 +++---
 .../jaxb/message/TestJaxbProtocolUtils.java        | 48 +++++++++------
 .../heartbeat/AbstractHeartbeatMonitor.java        |  2 +
 .../heartbeat/ClusterProtocolHeartbeatMonitor.java |  5 +-
 .../heartbeat/StandardNodeHeartbeat.java           | 11 +++-
 .../coordination/node/NodeClusterCoordinator.java  | 26 +++++++--
 .../heartbeat/TestAbstractHeartbeatMonitor.java    |  2 +-
 .../node/TestNodeClusterCoordinator.java           | 10 ++--
 .../org/apache/nifi/cluster/integration/Node.java  |  7 ++-
 .../apache/nifi/web/revision/RevisionManager.java  | 12 ++--
 .../apache/nifi/web/revision/RevisionSnapshot.java | 68 ++++++++++++++++++++++
 .../org/apache/nifi/controller/FlowController.java | 23 ++++----
 .../nifi/controller/StandardFlowService.java       | 40 +++++++------
 .../nifi/spring/FlowControllerFactoryBean.java     |  9 ++-
 .../src/main/resources/nifi-context.xml            |  1 +
 .../nifi/controller/StandardFlowServiceTest.java   | 17 +++---
 .../nifi/controller/StandardProcessorNodeIT.java   | 56 +++++++++---------
 .../reporting/TestStandardReportingContext.java    | 17 +++---
 .../nifi/integration/FrameworkIntegrationTest.java |  3 +-
 .../apache/nifi/headless/HeadlessNiFiServer.java   |  3 +-
 .../nifi/web/revision/NaiveRevisionManager.java    | 43 +++++++++-----
 27 files changed, 389 insertions(+), 162 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 7f5a8f1..2b7c07f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.coordination;
 
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
@@ -277,6 +278,12 @@ public interface ClusterCoordinator {
     void registerEventListener(ClusterTopologyEventListener eventListener);
 
     /**
+     * Validates that the heartbeat is valid and if not takes appropriate action to rectify
+     */
+    default void validateHeartbeat(NodeHeartbeat nodeHeartbeat) {
+    }
+
+    /**
      * Stops notifying the given listener when cluster topology events occurs
      * @param eventListener the event listener to stop notifying
      */
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
index c4413a2..28ffdc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
@@ -56,4 +56,9 @@ public interface NodeHeartbeat {
      * @return the time that the node reports having started NiFi
      */
     long getSystemStartTime();
+
+    /**
+     * @return the number of updates that have occurred to the Revision Manager
+     */
+    long getRevisionUpdateCount();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ComponentRevisionSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ComponentRevisionSnapshot.java
new file mode 100644
index 0000000..4c442be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ComponentRevisionSnapshot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.revision.RevisionSnapshot;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ComponentRevisionSnapshot {
+    private List<ComponentRevision> componentRevisions;
+    private Long revisionUpdateCount;
+
+    public List<ComponentRevision> getComponentRevisions() {
+        return componentRevisions;
+    }
+
+    public void setComponentRevisions(final List<ComponentRevision> componentRevisions) {
+        this.componentRevisions = componentRevisions;
+    }
+
+    public Long getRevisionUpdateCount() {
+        return revisionUpdateCount;
+    }
+
+    public void setRevisionUpdateCount(final Long revisionUpdateCount) {
+        this.revisionUpdateCount = revisionUpdateCount;
+    }
+
+    public static ComponentRevisionSnapshot fromRevisionSnapshot(final RevisionSnapshot revisionSnapshot) {
+        final List<ComponentRevision> componentRevisions = revisionSnapshot.getRevisions().stream()
+            .map(ComponentRevision::fromRevision)
+            .collect(Collectors.toList());
+
+        final ComponentRevisionSnapshot componentRevisionSnapshot = new ComponentRevisionSnapshot();
+        componentRevisionSnapshot.setComponentRevisions(componentRevisions);
+        componentRevisionSnapshot.setRevisionUpdateCount(revisionSnapshot.getRevisionUpdateCount());
+        return componentRevisionSnapshot;
+    }
+
+    public RevisionSnapshot toRevisionSnapshot() {
+        final List<Revision> revisions = componentRevisions == null ? Collections.emptyList() : componentRevisions.stream()
+            .map(ComponentRevision::toRevision)
+            .collect(Collectors.toList());
+        final long updateCount = revisionUpdateCount == null ? 0L : revisionUpdateCount;
+
+        return new RevisionSnapshot(revisions, updateCount);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index c0717a9..a6031c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -41,11 +41,11 @@ public class ConnectionResponse {
     private final DataFlow dataFlow;
     private final String instanceId;
     private final List<NodeConnectionStatus> nodeStatuses;
-    private final List<ComponentRevision> componentRevisions;
+    private final ComponentRevisionSnapshot revisionSnapshot;
 
 
     public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
-        final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
+        final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final ComponentRevisionSnapshot revisionSnapshot) {
 
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
@@ -57,7 +57,7 @@ public class ConnectionResponse {
         this.rejectionReason = null;
         this.instanceId = instanceId;
         this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses));
-        this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
+        this.revisionSnapshot = revisionSnapshot;
     }
 
     public ConnectionResponse(final int tryLaterSeconds, final String explanation) {
@@ -70,7 +70,7 @@ public class ConnectionResponse {
         this.rejectionReason = explanation;
         this.instanceId = null;
         this.nodeStatuses = null;
-        this.componentRevisions = null;
+        this.revisionSnapshot = null;
     }
 
     private ConnectionResponse(final String rejectionReason) {
@@ -80,7 +80,7 @@ public class ConnectionResponse {
         this.rejectionReason = rejectionReason;
         this.instanceId = null;
         this.nodeStatuses = null;
-        this.componentRevisions = null;
+        this.revisionSnapshot = null;
     }
 
     public static ConnectionResponse createBlockedByFirewallResponse() {
@@ -123,7 +123,7 @@ public class ConnectionResponse {
         return nodeStatuses;
     }
 
-    public List<ComponentRevision> getComponentRevisions() {
-        return componentRevisions;
+    public ComponentRevisionSnapshot getComponentRevisions() {
+        return revisionSnapshot;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
index 20848be..16aac26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
@@ -16,11 +16,9 @@
  */
 package org.apache.nifi.cluster.protocol;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.security.xml.XmlUtils;
+
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
@@ -28,8 +26,11 @@ import javax.xml.bind.Unmarshaller;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.security.xml.XmlUtils;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
 
 /**
  * The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node.
@@ -53,6 +54,7 @@ public class HeartbeatPayload {
     private long totalFlowFileBytes;
     private long systemStartTime;
     private List<NodeConnectionStatus> clusterStatus;
+    private long revisionUpdateCount;
 
     public int getActiveThreadCount() {
         return activeThreadCount;
@@ -94,6 +96,14 @@ public class HeartbeatPayload {
         this.clusterStatus = clusterStatus;
     }
 
+    public long getRevisionUpdateCount() {
+        return revisionUpdateCount;
+    }
+
+    public void setRevisionUpdateCount(final long revisionUpdateCount) {
+        this.revisionUpdateCount = revisionUpdateCount;
+    }
+
     public byte[] marshal() throws ProtocolException {
         final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
         marshal(this, payloadBytes);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
index 513818b..248c56b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -16,15 +16,14 @@
  */
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
-import java.util.List;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.cluster.protocol.ComponentRevision;
+import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.List;
+
 /**
  */
 public class AdaptedConnectionResponse {
@@ -35,7 +34,7 @@ public class AdaptedConnectionResponse {
     private int tryLaterSeconds;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
-    private List<ComponentRevision> componentRevisions;
+    private ComponentRevisionSnapshot componentRevisions;
 
     public AdaptedConnectionResponse() {
     }
@@ -94,11 +93,11 @@ public class AdaptedConnectionResponse {
         return this.nodeStatuses;
     }
 
-    public List<ComponentRevision> getComponentRevisions() {
+    public ComponentRevisionSnapshot getComponentRevisions() {
         return componentRevisions;
     }
 
-    public void setComponentRevisions(List<ComponentRevision> componentRevisions) {
+    public void setComponentRevisions(ComponentRevisionSnapshot componentRevisions) {
         this.componentRevisions = componentRevisions;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
index e443552..2320885 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
@@ -16,17 +16,16 @@
  */
 package org.apache.nifi.cluster.protocol.message;
 
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.cluster.protocol.ComponentRevision;
+import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
 
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.List;
+
 /**
  */
 @XmlRootElement(name = "reconnectionRequestMessage")
@@ -37,7 +36,7 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
     private boolean primary;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
-    private List<ComponentRevision> componentRevisions;
+    private ComponentRevisionSnapshot componentRevisions;
 
     public ReconnectionRequestMessage() {
     }
@@ -88,11 +87,11 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
         return nodeStatuses;
     }
 
-    public List<ComponentRevision> getComponentRevisions() {
+    public ComponentRevisionSnapshot getComponentRevisions() {
         return componentRevisions;
     }
 
-    public void setComponentRevisions(List<ComponentRevision> componentRevisions) {
+    public void setComponentRevisions(ComponentRevisionSnapshot componentRevisions) {
         this.componentRevisions = componentRevisions;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 7ced87e..d45bf4a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -17,39 +17,39 @@
 
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-
-import javax.xml.bind.JAXBException;
-
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
+import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.web.Revision;
 import org.junit.Test;
 
+import javax.xml.bind.JAXBException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestJaxbProtocolUtils {
 
     @Test
@@ -61,20 +61,30 @@ public class TestJaxbProtocolUtils {
         final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>());
         final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
         final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1")));
-        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisions));
+        final ComponentRevisionSnapshot revisionSnapshot = new ComponentRevisionSnapshot();
+        revisionSnapshot.setRevisionUpdateCount(12L);
+        revisionSnapshot.setComponentRevisions(componentRevisions);
+
+        final ComponentRevisionSnapshot componentRevisionSnapshot = new ComponentRevisionSnapshot();
+        componentRevisionSnapshot.setComponentRevisions(componentRevisions);
+        componentRevisionSnapshot.setRevisionUpdateCount(12L);
+
+        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisionSnapshot));
 
         JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
         final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
         assertTrue(unmarshalled instanceof ConnectionResponseMessage);
         final ConnectionResponseMessage unmarshalledMsg = (ConnectionResponseMessage) unmarshalled;
 
-        final List<ComponentRevision> revisions = msg.getConnectionResponse().getComponentRevisions();
+        final ComponentRevisionSnapshot receivedSnapshot = msg.getConnectionResponse().getComponentRevisions();
+        final List<ComponentRevision> revisions = receivedSnapshot.getComponentRevisions();
         assertEquals(1, revisions.size());
         assertEquals(8L, revisions.get(0).getVersion().longValue());
         assertEquals("client-1", revisions.get(0).getClientId());
         assertEquals("component-1", revisions.get(0).getComponentId());
 
-        assertEquals(revisions, unmarshalledMsg.getConnectionResponse().getComponentRevisions());
+        assertEquals(revisionSnapshot.getComponentRevisions(), receivedSnapshot.getComponentRevisions());
+        assertEquals(revisionSnapshot.getRevisionUpdateCount(), receivedSnapshot.getRevisionUpdateCount());
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 55d1e30..2b02a49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -292,6 +292,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
             clusterCoordinator.finishNodeConnection(nodeId);
             clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
         }
+
+        clusterCoordinator.validateHeartbeat(heartbeat);
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 43f3f2b..e22116c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -102,7 +102,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
         heartbeatMessages.clear();
         for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) {
             final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
-                    clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis());
+                    clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis(), 0L);
             heartbeatMessages.put(nodeId, heartbeat);
         }
     }
@@ -158,9 +158,10 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
         final int flowFileCount = (int) payload.getTotalFlowFileCount();
         final long flowFileBytes = payload.getTotalFlowFileBytes();
         final long systemStartTime = payload.getSystemStartTime();
+        final long revisionUpdateCount = payload.getRevisionUpdateCount();
 
         final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
-                connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
+                connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime, revisionUpdateCount);
         heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
         logger.debug("Received new heartbeat from {}", nodeId);
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
index a63db44..9fd06b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
@@ -32,9 +32,10 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
     private final long flowFileBytes;
     private final int activeThreadCount;
     private final long systemStartTime;
+    private final long revisionUpdateCount;
 
     public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus,
-        final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
+        final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime, final long revisionUpdateCount) {
         this.timestamp = timestamp;
         this.nodeId = nodeId;
         this.connectionStatus = connectionStatus;
@@ -42,6 +43,7 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
         this.flowFileBytes = flowFileBytes;
         this.activeThreadCount = activeThreadCount;
         this.systemStartTime = systemStartTime;
+        this.revisionUpdateCount = revisionUpdateCount;
     }
 
     @Override
@@ -79,12 +81,17 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
         return systemStartTime;
     }
 
+    @Override
+    public long getRevisionUpdateCount() {
+        return revisionUpdateCount;
+    }
+
     public static StandardNodeHeartbeat fromHeartbeatMessage(final HeartbeatMessage message, final long timestamp) {
         final Heartbeat heartbeat = message.getHeartbeat();
         final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload());
 
         return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(),
             (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
-            payload.getActiveThreadCount(), payload.getSystemStartTime());
+            payload.getActiveThreadCount(), payload.getSystemStartTime(), payload.getRevisionUpdateCount());
     }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 6d8a2ed..d029034 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
@@ -36,7 +37,7 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException;
-import org.apache.nifi.cluster.protocol.ComponentRevision;
+import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -956,6 +957,22 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         disconnectThread.start();
     }
 
+    public void validateHeartbeat(final NodeHeartbeat heartbeat) {
+        final long localUpdateCount = revisionManager.getRevisionUpdateCount();
+        final long nodeUpdateCount = heartbeat.getRevisionUpdateCount();
+
+        if (nodeUpdateCount > localUpdateCount) {
+            // If the node's Revision Update Count is larger than ours, it indicates that the node has the incorrect set of Revisions.
+            // This can happen, for instance, if the node connects to the cluster at the same time that a new node is joining and becoming the Cluster Coordinator.
+            // This case is very rare but can occur on occasion. As a result, we check for that here and if it occurs, request that the node disconnect so that
+            // it can reconnect.
+            final String message = String.format("Node has a Revision Update Count of %s but local value is only %s. Node appears not to have the appropriate set of Component Revisions",
+                heartbeat.getRevisionUpdateCount(), localUpdateCount);
+            logger.warn("Requesting that {} reconnect to the cluster due to: {}", heartbeat.getNodeIdentifier(), message);
+            requestNodeConnect(heartbeat.getNodeIdentifier(), null);
+        }
+    }
+
     private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds, final boolean includeDataFlow) {
         final Thread reconnectionThread = new Thread(new Runnable() {
             @Override
@@ -984,7 +1001,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                         }
 
                         request.setNodeConnectionStatuses(getConnectionStatuses());
-                        request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
+                        final ComponentRevisionSnapshot componentRevisionSnapshot = ComponentRevisionSnapshot.fromRevisionSnapshot(revisionManager.getAllRevisions());
+                        request.setComponentRevisions(componentRevisionSnapshot);
 
                         // Issue a reconnection request to the node.
                         senderListener.requestReconnection(request);
@@ -1227,8 +1245,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis());
         updateNodeStatus(status);
 
-        final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(),
-                revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
+        final ComponentRevisionSnapshot componentRevisionSnapshot = ComponentRevisionSnapshot.fromRevisionSnapshot(revisionManager.getAllRevisions());
+        final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(), componentRevisionSnapshot);
 
         final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
         responseMessage.setConnectionResponse(response);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 0a2a509..98aa0b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -209,7 +209,7 @@ public class TestAbstractHeartbeatMonitor {
 
     private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
         final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
-        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, 0, 0, 0, 0);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, 0, 0, 0, 0, 0L);
     }
 
     private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 1bca6fb..0d7b183 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.nifi.services.FlowService;
 import org.apache.nifi.state.MockStateMap;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.nifi.web.revision.RevisionSnapshot;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,6 +71,7 @@ public class TestNodeClusterCoordinator {
     private ClusterCoordinationProtocolSenderListener senderListener;
     private List<NodeConnectionStatus> nodeStatuses;
     private StateManagerProvider stateManagerProvider;
+    private final RevisionSnapshot emptyRevisionSnapshot = new RevisionSnapshot(Collections.emptyList(), 0L);
 
     private NiFiProperties createProperties() {
         final Map<String,String> addProps = new HashMap<>();
@@ -92,7 +94,7 @@ public class TestNodeClusterCoordinator {
 
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
 
         coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null, stateManagerProvider) {
             @Override
@@ -147,7 +149,7 @@ public class TestNodeClusterCoordinator {
         final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
 
         final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
                 null, revisionManager, createProperties(), null, stateManagerProvider) {
@@ -188,7 +190,7 @@ public class TestNodeClusterCoordinator {
 
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
 
         final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
                 null, revisionManager, createProperties(), null, stateManagerProvider) {
@@ -245,7 +247,7 @@ public class TestNodeClusterCoordinator {
     @Test(timeout = 5000)
     public void testStatusChangesReplicated() throws InterruptedException, IOException {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
 
         // Create a connection request message and send to the coordinator
         final NodeIdentifier requestedNodeId = createNodeId(1);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index ea181ac..10cb010 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -60,6 +60,7 @@ import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.nifi.web.revision.RevisionSnapshot;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
@@ -131,7 +132,8 @@ public class Node {
         this.extensionManager = extensionManager;
 
         revisionManager = Mockito.mock(RevisionManager.class);
-        Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        RevisionSnapshot revisionSnapshot = new RevisionSnapshot(Collections.emptyList(), 0L);
+        Mockito.when(revisionManager.getAllRevisions()).thenReturn(revisionSnapshot);
 
         electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
         this.flowElection = flowElection;
@@ -166,7 +168,8 @@ public class Node {
         final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
         flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
             null, null, createEncryptorFromProperties(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator,
-            heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class), extensionManager);
+            heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class), extensionManager,
+            revisionManager);
 
         try {
             flowController.initializeFlow();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 357f56a..6734c1d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -17,9 +17,6 @@
 
 package org.apache.nifi.web.revision;
 
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.web.Revision;
 
@@ -104,10 +101,15 @@ public interface RevisionManager {
      * Clears any revisions that are currently held and resets the Revision Manager so that the revisions
      * present are those provided in the given collection
      */
-    void reset(Collection<Revision> revisions);
+    void reset(RevisionSnapshot revisionSnapshot);
 
     /**
      * @return a List of all Revisions managed by this Revision Manager
      */
-    List<Revision> getAllRevisions();
+    RevisionSnapshot getAllRevisions();
+
+    /**
+     * Returns the number of updates that have occurred
+     */
+    long getRevisionUpdateCount();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionSnapshot.java
new file mode 100644
index 0000000..0a13eb3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionSnapshot.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import org.apache.nifi.web.Revision;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+
+public class RevisionSnapshot {
+    private final Collection<Revision> revisions;
+    private final long revisionUpdateCount;
+
+    public RevisionSnapshot(final Collection<Revision> revisions, final long revisionUpdateCount) {
+        this.revisions = new ArrayList<>(revisions);
+        this.revisionUpdateCount = revisionUpdateCount;
+    }
+
+    public Collection<Revision> getRevisions() {
+        return revisions;
+    }
+
+    public long getRevisionUpdateCount() {
+        return revisionUpdateCount;
+    }
+
+    @Override
+    public String toString() {
+        return "RevisionSnapshot[" +
+            "revisionUpdateCount=" + revisionUpdateCount +
+            ", revisions=" + revisions +
+            "]";
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        final RevisionSnapshot that = (RevisionSnapshot) o;
+        return revisionUpdateCount == that.revisionUpdateCount && revisions.equals(that.revisions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(revisions, revisionUpdateCount);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 011156f..fcefcd3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -188,6 +188,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.revision.RevisionManager;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,10 +236,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
     public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
 
-    public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
 
 
     // default properties for scaling the positions of components from pre-1.0 flow encoding versions.
@@ -274,18 +273,14 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     private final StateManagerProvider stateManagerProvider;
     private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
     private final VariableRegistry variableRegistry;
+    private final RevisionManager revisionManager;
 
     private final ConnectionLoadBalanceServer loadBalanceServer;
     private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
     private final FlowEngine loadBalanceClientThreadPool;
     private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<>();
 
-    private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Port> allInputPorts = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Port> allOutputPorts = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Funnel> allFunnels = new ConcurrentHashMap<>();
 
     private final ZooKeeperStateServer zooKeeperStateServer;
 
@@ -399,7 +394,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                 /* leader election manager */ null,
                 /* variable registry */ variableRegistry,
                 flowRegistryClient,
-                extensionManager);
+                extensionManager,
+                null);
     }
 
     public static FlowController createClusteredInstance(
@@ -415,7 +411,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             final LeaderElectionManager leaderElectionManager,
             final VariableRegistry variableRegistry,
             final FlowRegistryClient flowRegistryClient,
-            final ExtensionManager extensionManager) {
+            final ExtensionManager extensionManager,
+            final RevisionManager revisionManager) {
 
         final FlowController flowController = new FlowController(
                 flowFileEventRepo,
@@ -431,7 +428,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                 leaderElectionManager,
                 variableRegistry,
                 flowRegistryClient,
-                extensionManager);
+                extensionManager,
+                revisionManager);
 
         return flowController;
     }
@@ -451,7 +449,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             final LeaderElectionManager leaderElectionManager,
             final VariableRegistry variableRegistry,
             final FlowRegistryClient flowRegistryClient,
-            final ExtensionManager extensionManager) {
+            final ExtensionManager extensionManager,
+            final RevisionManager revisionManager) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(1);
@@ -466,6 +465,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         this.auditService = auditService;
         this.configuredForClustering = configuredForClustering;
         this.flowRegistryClient = flowRegistryClient;
+        this.revisionManager = revisionManager;
 
         try {
             // Form the container object from the properties
@@ -2896,6 +2896,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             final HeartbeatPayload hbPayload = new HeartbeatPayload();
             hbPayload.setSystemStartTime(systemStartTime);
             hbPayload.setActiveThreadCount(getActiveThreadCount());
+            hbPayload.setRevisionUpdateCount(revisionManager.getRevisionUpdateCount());
 
             final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
             hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index fae37a5..ce9fbcf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -29,7 +29,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
-import org.apache.nifi.cluster.protocol.ComponentRevision;
+import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -73,6 +73,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.nifi.web.revision.RevisionSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -881,22 +882,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         try {
             logger.info("Connecting Node: " + nodeId);
 
-            // Upon NiFi startup, the node will register for the Cluster Coordinator role with the Leader Election Manager.
-            // Sometimes the node will register as an active participant, meaning that it wants to be elected. This happens when the entire cluster starts up,
-            // for example. (This is determined by checking whether or not there already is a Cluster Coordinator registered).
-            // Other times, it registers as a 'silent' member, meaning that it will not be elected.
-            // If the leader election timeout is long (say 30 or 60 seconds), it is possible that this node was the Leader and was then restarted,
-            // and upon restart found that itself was already registered as the Cluster Coordinator. As a result, it registers as a Silent member of the
-            // election, and then connects to itself as the Cluster Coordinator. At this point, since the node has just restarted, it doesn't know about
-            // any of the nodes in the cluster. As a result, it will get the Cluster Topology from itself, and think there are no other nodes in the cluster.
-            // This causes all other nodes to send in their heartbeats, which then results in them being disconnected because they were previously unknown and
-            // as a result asked to reconnect to the cluster.
-            //
-            // To avoid this, we do not allow the node to connect to itself if it's not an active participant. This means that when the entire cluster is started
-            // up, the node can still connect to itself because it will be an active participant. But if it is then restarted, it won't be allowed to connect
-            // to itself. It will instead have to wait until another node is elected Cluster Coordinator.
-            final boolean activeCoordinatorParticipant = controller.getLeaderElectionManager().isActiveParticipant(ClusterRoles.CLUSTER_COORDINATOR);
-
             // create connection request message
             final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow);
 
@@ -917,6 +902,22 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             ConnectionResponse response = null;
             for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
                 try {
+                    // Upon NiFi startup, the node will register for the Cluster Coordinator role with the Leader Election Manager.
+                    // Sometimes the node will register as an active participant, meaning that it wants to be elected. This happens when the entire cluster starts up,
+                    // for example. (This is determined by checking whether or not there already is a Cluster Coordinator registered).
+                    // Other times, it registers as a 'silent' member, meaning that it will not be elected.
+                    // If the leader election timeout is long (say 30 or 60 seconds), it is possible that this node was the Leader and was then restarted,
+                    // and upon restart found that itself was already registered as the Cluster Coordinator. As a result, it registers as a Silent member of the
+                    // election, and then connects to itself as the Cluster Coordinator. At this point, since the node has just restarted, it doesn't know about
+                    // any of the nodes in the cluster. As a result, it will get the Cluster Topology from itself, and think there are no other nodes in the cluster.
+                    // This causes all other nodes to send in their heartbeats, which then results in them being disconnected because they were previously unknown and
+                    // as a result asked to reconnect to the cluster.
+                    //
+                    // To avoid this, we do not allow the node to connect to itself if it's not an active participant. This means that when the entire cluster is started
+                    // up, the node can still connect to itself because it will be an active participant. But if it is then restarted, it won't be allowed to connect
+                    // to itself. It will instead have to wait until another node is elected Cluster Coordinator.
+                    final boolean activeCoordinatorParticipant = controller.getLeaderElectionManager().isActiveParticipant(ClusterRoles.CLUSTER_COORDINATOR);
+
                     response = senderListener.requestConnection(requestMsg, activeCoordinatorParticipant).getConnectionResponse();
 
                     if (response.shouldTryLater()) {
@@ -1022,7 +1023,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             // set node ID on controller before we start heartbeating because heartbeat needs node ID
             clusterCoordinator.setLocalNodeIdentifier(nodeId);
             clusterCoordinator.setConnected(true);
-            revisionManager.reset(response.getComponentRevisions().stream().map(ComponentRevision::toRevision).collect(Collectors.toList()));
+
+            final ComponentRevisionSnapshot componentRevisionSnapshot = response.getComponentRevisions();
+            final RevisionSnapshot revisionSnapshot = componentRevisionSnapshot.toRevisionSnapshot();
+            revisionManager.reset(revisionSnapshot);
 
             // mark the node as clustered
             controller.setClustered(true, response.getInstanceId());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index 2124daf..b21bee4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -30,6 +30,7 @@ import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.revision.RevisionManager;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.ApplicationContext;
@@ -53,6 +54,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
     private LeaderElectionManager leaderElectionManager;
     private FlowRegistryClient flowRegistryClient;
     private ExtensionManager extensionManager;
+    private RevisionManager revisionManager;
 
     @Override
     public Object getObject() throws Exception {
@@ -75,7 +77,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
                     leaderElectionManager,
                     variableRegistry,
                     flowRegistryClient,
-                    extensionManager);
+                    extensionManager,
+                    revisionManager);
             } else {
                 flowController = FlowController.createStandaloneInstance(
                     flowFileEventRepository,
@@ -150,4 +153,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
     public void setExtensionManager(ExtensionManager extensionManager) {
         this.extensionManager = extensionManager;
     }
+
+    public void setRevisionManager(final RevisionManager revisionManager) {
+        this.revisionManager = revisionManager;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index 89a7b0d..08f663a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -56,6 +56,7 @@
         <property name="leaderElectionManager" ref="leaderElectionManager" />
         <property name="flowRegistryClient" ref="flowRegistryClient" />
         <property name="extensionManager" ref="extensionManager" />
+        <property name="revisionManager" ref="revisionManager" />
     </bean>
 
     <!-- flow service -->
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
index 75b85ea..7d2ac7e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.authorization.Authorizer;
@@ -56,6 +48,15 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.w3c.dom.Document;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
 /**
  */
 @Ignore
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
index 1dd0125..7f0dda4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
@@ -17,34 +17,6 @@
 
 package org.apache.nifi.controller;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -97,6 +69,34 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class StandardProcessorNodeIT {
 
     private MockVariableRegistry variableRegistry;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
index f9fad7b..b1975fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.controller.reporting;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
@@ -55,6 +47,15 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
 public class TestStandardReportingContext {
 
     private static final String DEFAULT_SENSITIVE_PROPS_KEY = "nififtw!";
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 52d2b39..1243b5a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -99,6 +99,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.FileUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.revision.RevisionManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -265,7 +266,7 @@ public class FrameworkIntegrationTest {
             Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
 
             flowController = FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, protocolSender, bulletinRepo, clusterCoordinator,
-                heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
+                heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager, Mockito.mock(RevisionManager.class));
 
             flowController.setClustered(true, UUID.randomUUID().toString());
             flowController.setNodeId(localNodeId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index 0b888fc..3439046 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -140,8 +140,7 @@ public class HeadlessNiFiServer implements NiFiServer {
                     bulletinRepository,
                     variableRegistry,
                     flowRegistryClient,
-                    extensionManager
-                    );
+                    extensionManager);
 
             flowService = StandardFlowService.createStandaloneInstance(
                     flowController,
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
index df6c1ae..42a0763 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * <p>
@@ -43,34 +45,40 @@ public class NaiveRevisionManager implements RevisionManager {
     private static final Logger logger = LoggerFactory.getLogger(NaiveRevisionManager.class);
 
     private final ConcurrentMap<String, Revision> revisionMap = new ConcurrentHashMap<>();
-
+    private final AtomicLong revisionUpdateCounter = new AtomicLong(0L);
 
     @Override
-    public void reset(final Collection<Revision> revisions) {
-        synchronized (this) { // avoid allowing two threads to reset versions concurrently
-            logger.info("Resetting Revisions for all components. {} revisions will be removed, {} will be added", revisionMap.size(), revisions.size());
-            logger.debug("New Revisions: {}", revisions);
+    public synchronized void reset(final RevisionSnapshot revisionSnapshot) {
+        final Collection<Revision> revisions = revisionSnapshot.getRevisions();
+        logger.info("Resetting Revisions for all components. {} revisions will be removed, {} will be added", revisionMap.size(), revisions.size());
+        logger.debug("New Revisions: {}", revisions);
 
-            revisionMap.clear();
+        revisionMap.clear();
 
-            for (final Revision revision : revisions) {
-                revisionMap.put(revision.getComponentId(), revision);
-            }
+        for (final Revision revision : revisions) {
+            revisionMap.put(revision.getComponentId(), revision);
         }
+
+        revisionUpdateCounter.set(revisionSnapshot.getRevisionUpdateCount());
+    }
+
+    @Override
+    public synchronized RevisionSnapshot getAllRevisions() {
+        return new RevisionSnapshot(revisionMap.values(), revisionUpdateCounter.get());
     }
 
     @Override
-    public List<Revision> getAllRevisions() {
-        return new ArrayList<>(revisionMap.values());
+    public synchronized long getRevisionUpdateCount() {
+        return revisionUpdateCounter.get();
     }
 
     @Override
-    public Revision getRevision(final String componentId) {
+    public synchronized Revision getRevision(final String componentId) {
         return revisionMap.computeIfAbsent(componentId, id -> new Revision(0L, null, componentId));
     }
 
     @Override
-    public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
+    public synchronized <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
         Objects.requireNonNull(user);
         logger.debug("Attempting to delete revision using {}", claim);
         final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
@@ -92,11 +100,13 @@ public class NaiveRevisionManager implements RevisionManager {
             revisionMap.remove(revision.getComponentId());
         }
 
+        revisionUpdateCounter.addAndGet(revisionList.size());
+
         return taskResult;
     }
 
     @Override
-    public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
+    public synchronized <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
         Objects.requireNonNull(user);
         logger.debug("Attempting to update revision using {}", originalClaim);
 
@@ -122,9 +132,12 @@ public class NaiveRevisionManager implements RevisionManager {
         // If the update succeeded then put the updated revisions into the revisionMap
         // If an exception is thrown during the update we don't want to update revision so it is ok to bounce out of this method
         if (updatedComponent != null) {
-            for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) {
+            final Set<Revision> updatedRevisions = updatedComponent.getUpdatedRevisions();
+            for (final Revision updatedRevision : updatedRevisions) {
                 revisionMap.put(updatedRevision.getComponentId(), updatedRevision);
             }
+
+            revisionUpdateCounter.addAndGet(updatedRevisions.size());
         }
 
         return updatedComponent;