You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/22 20:38:32 UTC

[nifi] branch main updated: NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 87e61c50ee NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.
87e61c50ee is described below

commit 87e61c50eefeef46e6fc5e0080a70d2605a2799b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Feb 21 11:28:54 2023 -0500

    NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.
    
    NIFI-11192: If a failure is encountered when changing the version of a flow from 1 version to another, attempt to rollback the changes instead of just failing with the flow in a bad state
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #6981
---
 .../StandardVersionedComponentSynchronizer.java    |  41 ++-
 .../apache/nifi/groups/StandardProcessGroup.java   |   3 +-
 .../apache/nifi/util/FlowDifferenceFilters.java    |  38 +++
 .../apache/nifi/web/api/FlowUpdateResource.java    | 121 +++++---
 .../registry/FileSystemFlowRegistryClient.java     |  47 ++++
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  70 +++++
 .../system/registry/ClusteredRegistryClientIT.java |  29 ++
 .../tests/system/registry/RegistryClientIT.java    |  84 +++++-
 .../src/test/resources/conf/default/bootstrap.conf |   2 +-
 .../src/test/resources/conf/default/logback.xml    |   1 +
 .../test/resources/conf/default/nifi.properties    |   2 +-
 .../flow-with-invalid-connection/1/snapshot.json   | 305 +++++++++++++++++++++
 .../flow-with-invalid-connection/2/snapshot.json   | 156 +++++++++++
 .../test-flows/port-moved-groups/1/snapshot.json   | 305 +++++++++++++++++++++
 .../test-flows/port-moved-groups/2/snapshot.json   |   1 +
 .../cli/impl/client/nifi/ProcessGroupClient.java   |   2 +
 .../client/nifi/impl/JerseyProcessGroupClient.java |  16 ++
 17 files changed, 1178 insertions(+), 45 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index f7e64336ee..5659567149 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -683,18 +683,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
             // If the Connection's destination didn't change, nothing to do
             final String destinationVersionId = connection.getDestination().getVersionedComponentId().orElse(null);
             final String proposedDestinationId = proposedConnection.getDestination().getId();
-            if (Objects.equals(destinationVersionId, proposedDestinationId)) {
+            final String destinationGroupVersionId = connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
+            final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
+            if (Objects.equals(destinationVersionId, proposedDestinationId) && Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
                 continue;
             }
 
             // Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
             // we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
             Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
-            if (
-                newDestination == null
-                ||
-                (newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup()))
-            ) {
+            final boolean useTempDestination = isTempDestinationNecessary(connection, proposedConnection, newDestination);
+            if (useTempDestination) {
                 final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
                 LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
                 newDestination = temporaryDestination;
@@ -707,6 +706,36 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         return connectionsWithTempDestination;
     }
 
+    private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) {
+        if (newDestination == null) {
+            return true;
+        }
+
+        // If the destination is an Input Port or an Output Port and the group changed, use a temp destination
+        final ConnectableType connectableType = newDestination.getConnectableType();
+        final boolean port = connectableType == ConnectableType.OUTPUT_PORT || connectableType == ConnectableType.INPUT_PORT;
+        final boolean groupChanged = !newDestination.getProcessGroup().equals(existingConnection.getProcessGroup());
+        if (port && groupChanged) {
+            return true;
+        }
+
+        // If the proposed destination has a different group than the existing group, use a temp destination.
+        final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
+        final String destinationGroupVersionedComponentId = existingConnection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
+        if (!Objects.equals(proposedDestinationGroupId, destinationGroupVersionedComponentId)) {
+            return true;
+        }
+
+        // If the proposed connection exists in a different group than the existing group, use a temp destination.
+        final String connectionGroupVersionedComponentId = existingConnection.getProcessGroup().getVersionedComponentId().orElse(null);
+        final String proposedGroupId = proposedConnection.getGroupIdentifier();
+        if (!Objects.equals(proposedGroupId, connectionGroupVersionedComponentId)) {
+            return true;
+        }
+
+        return false;
+    }
+
     private Funnel getTemporaryFunnel(final ProcessGroup group) {
         final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
         Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1206256073..6b90bb10d7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4075,7 +4075,8 @@ public final class StandardProcessGroup implements ProcessGroup {
             final VersionControlInformation versionControlInfo = getVersionControlInformation();
             if (versionControlInfo != null) {
                 if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) {
-                    throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
+                    throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with. Currently synced to " +
+                        "flow with ID " + versionControlInfo.getFlowIdentifier() + " but proposed flow's metadata shows flow identifier as " + updatedFlow.getMetadata().getFlowIdentifier());
                 }
 
                 if (verifyNotDirty) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index c9e07622e0..db714946db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -58,6 +58,7 @@ public class FlowDifferenceFilters {
     public static boolean isEnvironmentalChange(final FlowDifference difference, final VersionedProcessGroup localGroup, final FlowManager flowManager) {
         return difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED
             || isVariableValueChange(difference)
+            || isSensitivePropertyDueToGhosting(difference, flowManager)
             || isAncestorVariableAdded(difference, flowManager)
             || isRpgUrlChange(difference)
             || isAddedOrRemovedRemotePort(difference)
@@ -75,6 +76,43 @@ public class FlowDifferenceFilters {
             || isParameterContextChange(difference);
     }
 
+    private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
+        if (difference.getDifferenceType() != DifferenceType.PROPERTY_SENSITIVITY_CHANGED) {
+            return false;
+        }
+
+        final String componentAId = difference.getComponentA().getInstanceIdentifier();
+        if (componentAId != null) {
+            final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentAId);
+            if (componentNode != null && componentNode.isExtensionMissing()) {
+                return true;
+            }
+        }
+
+        final String componentBId = difference.getComponentB().getInstanceIdentifier();
+        if (componentBId != null) {
+            final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentBId);
+            if (componentNode != null && componentNode.isExtensionMissing()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static ComponentNode getComponent(final FlowManager flowManager, final ComponentType componentType, final String componentId) {
+        switch (componentType) {
+            case CONTROLLER_SERVICE:
+                return flowManager.getControllerServiceNode(componentId);
+            case PROCESSOR:
+                return flowManager.getProcessorNode(componentId);
+            case REPORTING_TASK:
+                return flowManager.getReportingTaskNode(componentId);
+        }
+
+        return null;
+    }
+
     // The Registry URL may change if, for instance, a registry is moved to a new host, or is made secure, the port changes, etc.
     // Since this can be handled by the client anyway, there's no need to flag this as a 'local modification'
     private static boolean isRegistryUrlChange(final FlowDifference difference) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 3bc971d25b..6b3311c572 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -28,10 +28,10 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.VersionedParameterContext;
 import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.FlowRegistryUtils;
 import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
-import org.apache.nifi.flow.VersionedParameterContext;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.ResumeFlowException;
@@ -53,6 +53,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupDescriptorEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.util.AffectedComponentUtils;
 import org.apache.nifi.web.util.CancellableTimedPause;
 import org.apache.nifi.web.util.ComponentLifecycle;
@@ -375,47 +376,32 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
         }
         asyncRequest.markStepComplete();
 
+        // Get the Original Flow Snapshot in case we fail to update and need to rollback
+        final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(groupId);
+        final RegisteredFlowSnapshot originalFlowSnapshot = serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
+
         try {
             if (replicateRequest) {
                 // If replicating request, steps 9-11 are performed on each node individually
+                final URI replicateUri = buildUri(requestUri, replicateUriPath, null);
                 final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
-                URI replicateUri = null;
                 try {
-                    replicateUri = new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
-                            replicateUriPath, null, requestUri.getFragment());
-                } catch (URISyntaxException e) {
-                    throw new RuntimeException(e);
-                }
-
-                final Map<String, String> headers = new HashMap<>();
-                headers.put("content-type", MediaType.APPLICATION_JSON);
-
-                // each concrete class creates its own type of entity for replication
-                final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
-
-                final NodeResponse clusterResponse;
-                try {
-                    logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
-
-                    if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-                        clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+                    final NodeResponse clusterResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, flowSnapshot);
+                    verifyResponseCode(clusterResponse, replicateUri, user, "update");
+                } catch (final Exception e) {
+                    if (originalFlowSnapshot == null) {
+                        logger.debug("Failed to update flow but could not determine original flow to rollback to so will not make any attempt to revert the flow.");
                     } else {
-                        clusterResponse = getRequestReplicator().forwardToCoordinator(
-                                getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+                        try {
+                            final NodeResponse rollbackResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, originalFlowSnapshot);
+                            verifyResponseCode(rollbackResponse, replicateUri, user, "rollback");
+                        } catch (final Exception inner) {
+                            e.addSuppressed(inner);
+                        }
                     }
-                } catch (final InterruptedException ie) {
-                    logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
-                    Thread.currentThread().interrupt();
-                    throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
-                }
 
-                final int updateFlowStatus = clusterResponse.getStatus();
-                if (updateFlowStatus != Status.OK.getStatusCode()) {
-                    final String explanation = getResponseEntity(clusterResponse, String.class);
-                    logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
-                            replicateUri, user, updateFlowStatus, explanation);
-                    throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+                    throw e;
                 }
             } else {
                 // Step 9: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
@@ -425,7 +411,27 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
 
                 // Step 10-11. Update Process Group to the new flow and update variable registry with any Variables that were added or removed.
                 // Each concrete class defines its own update flow functionality
-                performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
+                try {
+                    performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
+                } catch (final Exception e) {
+                    // If clustered, just throw the original Exception.
+                    // Otherwise, rollback the flow update. We do not perform the rollback if clustered because
+                    // we want this to be handled at a higher level, allowing the request to replace our flow version to come from the coordinator
+                    // if any node fails to perform the update.
+                    if (isClustered()) {
+                        throw e;
+                    }
+
+                    // Rollback the update to the original flow snapshot. If there's any Exception, add it as a Suppressed Exception to the original so
+                    // that it can be logged but not overtake the original Exception as the cause.
+                    try {
+                        performUpdateFlow(groupId, revision, requestEntity, originalFlowSnapshot, idGenerationSeed, false, true);
+                    } catch (final Exception inner) {
+                        e.addSuppressed(inner);
+                    }
+
+                    throw e;
+                }
             }
         } finally {
             if (!asyncRequest.isCancelled()) {
@@ -530,6 +536,53 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
         asyncRequest.setCancelCallback(null);
     }
 
+    private URI buildUri(final URI requestUri, final String path, final String query) {
+        try {
+            return new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
+                path, query, requestUri.getFragment());
+        } catch (final URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void verifyResponseCode(final NodeResponse response, final URI uri, final NiFiUser user, final String actionDescription) throws LifecycleManagementException {
+        final int updateFlowStatus = response.getStatus();
+        if (updateFlowStatus != Status.OK.getStatusCode()) {
+            final String explanation = getResponseEntity(response, String.class);
+            logger.error("Failed to {} flow update across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
+                actionDescription, uri, user, updateFlowStatus, explanation);
+            throw new LifecycleManagementException("Failed to " + actionDescription + " flow on all nodes in cluster due to " + explanation);
+        }
+    }
+
+    private NodeResponse replicateFlowUpdateRequest(final URI replicateUri, final NiFiUser user, final T requestEntity, final Revision revision, final RegisteredFlowSnapshot flowSnapshot)
+                throws LifecycleManagementException {
+
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("content-type", MediaType.APPLICATION_JSON);
+
+        // each concrete class creates its own type of entity for replication
+        final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
+
+        final NodeResponse clusterResponse;
+        try {
+            logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
+
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+            }
+        } catch (final InterruptedException ie) {
+            logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
+            Thread.currentThread().interrupt();
+            throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
+        }
+
+        return clusterResponse;
+    }
+
     /**
      * Get a list of steps to perform for upload flow
      */
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
index f47846361c..8bf24b6b59 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
@@ -48,6 +48,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@@ -209,13 +210,59 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
         final File versionDir = new File(flowDir, String.valueOf(version));
         final File snapshotFile = new File(versionDir, "snapshot.json");
 
+        final Pattern intPattern = Pattern.compile("\\d+");
+        final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches());
+
         final JsonFactory factory = new JsonFactory(objectMapper);
         try (final JsonParser parser = factory.createParser(snapshotFile)) {
             final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
+            populateBucket(snapshot, bucketId);
+            populateFlow(snapshot, bucketId, flowId, version, versionFiles == null ? 0 : versionFiles.length);
+
             return snapshot;
         }
     }
 
+    private void populateBucket(final RegisteredFlowSnapshot snapshot, final String bucketId) {
+        final FlowRegistryBucket existingBucket = snapshot.getBucket();
+        if (existingBucket != null) {
+            return;
+        }
+
+        final FlowRegistryBucket bucket = new FlowRegistryBucket();
+        bucket.setCreatedTimestamp(System.currentTimeMillis());
+        bucket.setIdentifier(bucketId);
+        bucket.setName(bucketId);
+        bucket.setPermissions(createAllowAllPermissions());
+        snapshot.setBucket(bucket);
+
+        snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId);
+    }
+
+    private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int version, final int numVersions) {
+        final RegisteredFlow existingFlow = snapshot.getFlow();
+        if (existingFlow != null) {
+            return;
+        }
+
+        final RegisteredFlow flow = new RegisteredFlow();
+        flow.setCreatedTimestamp(System.currentTimeMillis());
+        flow.setLastModifiedTimestamp(System.currentTimeMillis());
+        flow.setBucketIdentifier(bucketId);
+        flow.setBucketName(bucketId);
+        flow.setIdentifier(flowId);
+        flow.setName(flowId);
+        flow.setPermissions(createAllowAllPermissions());
+        flow.setVersionCount(numVersions);
+
+        final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
+        versionInfo.setVersion(version);
+        flow.setVersionInfo(versionInfo);
+
+        snapshot.setFlow(flow);
+        snapshot.getSnapshotMetadata().setFlowIdentifier(flowId);
+    }
+
     @Override
     public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException {
         final File rootDir = getRootDirectory(context);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index b6f1c37623..0fb983a8f5 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -38,6 +38,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.DifferenceDTO;
 import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowRegistryClientDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -75,6 +76,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
@@ -1652,6 +1654,74 @@ public class NiFiClientUtil {
         return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity);
     }
 
+    public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity groupEntity = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
+        final ProcessGroupDTO groupDto = groupEntity.getComponent();
+        final VersionControlInformationDTO vciDto = groupDto.getVersionControlInformation();
+        if (vciDto == null) {
+            throw new IllegalArgumentException("Process Group with ID " + processGroupId + " is not under Version Control");
+        }
+
+        vciDto.setVersion(version);
+
+        final VersionControlInformationEntity requestEntity = new VersionControlInformationEntity();
+        requestEntity.setProcessGroupRevision(groupEntity.getRevision());
+        requestEntity.setVersionControlInformation(vciDto);
+
+        final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId, requestEntity);
+        return waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
+    }
+
+    public VersionedFlowUpdateRequestEntity waitForVersionFlowUpdateComplete(final String updateRequestId) throws NiFiClientException, IOException, InterruptedException {
+        while (true) {
+            final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
+            final boolean complete = result.getRequest().isComplete();
+            if (complete) {
+                return nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
+            }
+
+            logger.debug("Waiting for Version Flow Update request to complete...");
+            Thread.sleep(100L);
+        }
+    }
+
+    public void assertFlowStaleAndUnmodified(final String processGroupId) throws NiFiClientException, IOException {
+        final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
+        if ("STALE".equalsIgnoreCase(state)) {
+            return;
+        }
+
+        if ("LOCALLY_MODIFIED_AND_STALE".equalsIgnoreCase(state)) {
+            final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
+            final String differences = flowComparisonEntity.getComponentDifferences().stream()
+                .flatMap(dto -> dto.getDifferences().stream())
+                .map(DifferenceDTO::getDifference)
+                .collect(Collectors.joining("\n"));
+            throw new AssertionError("Expected state to be STALE but was " + state + " with the following modifications:\n" + differences);
+        }
+
+        throw new AssertionError("Expected state to be STALE but was " + state);
+    }
+
+    public void assertFlowUpToDate(final String processGroupId) throws NiFiClientException, IOException {
+        final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
+        if ("UP_TO_DATE".equalsIgnoreCase(state)) {
+            return;
+        }
+
+        if ("LOCALLY_MODIFIED".equalsIgnoreCase(state)) {
+            final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
+            final String differences = flowComparisonEntity.getComponentDifferences().stream()
+                .flatMap(dto -> dto.getDifferences().stream())
+                .map(DifferenceDTO::getDifference)
+                .collect(Collectors.joining("\n"));
+            throw new AssertionError("Expected state to be UP_TO_DATE but was LOCALLY_MODIFIED with the following modifications:\n" + differences);
+        }
+
+        throw new AssertionError("Expected state to be UP_TO_DATE but was " + state);
+
+    }
+
     public FlowEntity copyAndPaste(final ProcessGroupEntity pgEntity, final String destinationGroupId) throws NiFiClientException, IOException {
         final SnippetDTO snippetDto = new SnippetDTO();
         snippetDto.setProcessGroups(Collections.singletonMap(pgEntity.getId(), pgEntity.getRevision()));
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java
new file mode 100644
index 0000000000..d592323775
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.registry;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+
+public class ClusteredRegistryClientIT extends RegistryClientIT {
+
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index 9d9334951e..40bf8c32b7 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -20,6 +20,7 @@ package org.apache.nifi.tests.system.registry;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -30,6 +31,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -37,14 +39,87 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RegistryClientIT extends NiFiSystemIT {
+
+    @Test
+    public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
+
+        final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "port-moved-groups", 1);
+        assertNotNull(imported);
+        getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+        // Ensure that the import worked as expected
+        final FlowSnippetDTO groupContents = imported.getComponent().getContents();
+        final List<ProcessorDTO> replaceTextProcessors = groupContents.getProcessors().stream()
+            .filter(proc -> proc.getName().equals("ReplaceText"))
+            .collect(Collectors.toList());
+        assertEquals(1, replaceTextProcessors.size());
+
+        assertTrue(groupContents.getInputPorts().isEmpty());
+
+        // Change to version 2
+        final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
+        assertNull(version2Result.getRequest().getFailureReason());
+
+        final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+        getClientUtil().assertFlowUpToDate(imported.getId());
+
+        // Ensure that the ReplaceText processor still exists
+        final long replaceTextCount = v2Contents.getProcessors().stream()
+            .map(ProcessorEntity::getComponent)
+            .filter(proc -> proc.getName().equals("ReplaceText"))
+            .filter(proc -> proc.getId().equals(replaceTextProcessors.get(0).getId()))
+            .count();
+        assertEquals(1, replaceTextCount);
+
+        // Ensure that we now have a Port at the top level
+        assertEquals(1, v2Contents.getInputPorts().size());
+
+        // Change back to Version 1
+        final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), 1);
+        assertNull(changeBackToV1Result.getRequest().getFailureReason());
+
+        final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+        getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+        // Ensure that we no longer have a Port at the top level
+        assertTrue(v1Contents.getInputPorts().isEmpty());
+    }
+
+
+    @Test
+    public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
+
+        final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "flow-with-invalid-connection", 1);
+        assertNotNull(imported);
+        getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+        final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
+        final String failureReason = version2Result.getRequest().getFailureReason();
+        assertNotNull(failureReason);
+
+        // Ensure that we're still on v1 of the flow and there are no local modifications
+        getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+        // Ensure that the processors still exist
+        final FlowDTO contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+        assertEquals(1, contents.getProcessors().size());
+    }
+
+
     @Test
     public void testStartVersionControlThenImport() throws NiFiClientException, IOException {
         final FlowRegistryClientEntity clientEntity = registerClient();
@@ -68,10 +143,15 @@ public class RegistryClientIT extends NiFiSystemIT {
     }
 
     private FlowRegistryClientEntity registerClient() throws NiFiClientException, IOException {
-        final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
-        final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
         final File storageDir = new File("target/flowRegistryStorage/" + getTestName().replace("\\(.*?\\)", ""));
         Files.createDirectories(storageDir.toPath());
+
+        return registerClient(storageDir);
+    }
+
+    private FlowRegistryClientEntity registerClient(final File storageDir) throws NiFiClientException, IOException {
+        final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
+        final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
         getClientUtil().updateRegistryClientProperties(clientEntity, Collections.singletonMap("Directory", storageDir.getAbsolutePath()));
 
         return clientEntity;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
index 051ca4d936..86900d809e 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
@@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
 
 java.arg.14=-Djava.awt.headless=true
 
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
 
 # Disable Logback web shutdown hook using System property
 java.arg.logbackShutdown=-DlogbackDisableServletContainerInitializer=true
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
index 9471514f10..1a62d56362 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
@@ -113,6 +113,7 @@
     <logger name="org.apache.nifi.controller.reporting.LogComponentStatuses" level="ERROR" />
 
     <logger name="org.apache.calcite.runtime.CalciteException" level="OFF" />
+    <logger name="deprecation" level="OFF" />
 
     <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
     <logger name="org.apache.curator.ConnectionState" level="OFF" />
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 7c6426c67b..222206984e 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -248,4 +248,4 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
 
 # external properties files for variable registry
 # supports a comma delimited list of file locations
-nifi.variable.registry.properties=
\ No newline at end of file
+nifi.variable.registry.properties=
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json
new file mode 100644
index 0000000000..4c2d4f0227
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json
@@ -0,0 +1,305 @@
+{
+  "externalControllerServices": {},
+  "flowContents": {
+    "comments": "",
+    "componentType": "PROCESS_GROUP",
+    "connections": [
+      {
+        "backPressureDataSizeThreshold": "1 GB",
+        "backPressureObjectThreshold": 10000,
+        "bends": [],
+        "componentType": "CONNECTION",
+        "destination": {
+          "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+          "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+          "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+          "name": "in",
+          "type": "INPUT_PORT"
+        },
+        "flowFileExpiration": "0 sec",
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+        "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+        "labelIndex": 1,
+        "loadBalanceCompression": "DO_NOT_COMPRESS",
+        "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+        "name": "",
+        "partitioningAttribute": "",
+        "prioritizers": [],
+        "selectedRelationships": [
+          "failure",
+          "success"
+        ],
+        "source": {
+          "comments": "",
+          "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+          "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+          "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+          "name": "ReplaceText",
+          "type": "PROCESSOR"
+        },
+        "zIndex": 0
+      }
+    ],
+    "controllerServices": [],
+    "defaultBackPressureDataSizeThreshold": "1 GB",
+    "defaultBackPressureObjectThreshold": 10000,
+    "defaultFlowFileExpiration": "0 sec",
+    "flowFileConcurrency": "UNBOUNDED",
+    "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+    "funnels": [],
+    "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+    "inputPorts": [],
+    "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+    "labels": [],
+    "name": "parent",
+    "outputPorts": [],
+    "position": {
+      "x": 614.0,
+      "y": 462.0
+    },
+    "processGroups": [
+      {
+        "comments": "",
+        "componentType": "PROCESS_GROUP",
+        "connections": [
+          {
+            "backPressureDataSizeThreshold": "1 GB",
+            "backPressureObjectThreshold": 10000,
+            "bends": [],
+            "componentType": "CONNECTION",
+            "destination": {
+              "comments": "",
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+              "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+              "name": "UpdateAttribute",
+              "type": "PROCESSOR"
+            },
+            "flowFileExpiration": "0 sec",
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+            "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+            "labelIndex": 1,
+            "loadBalanceCompression": "DO_NOT_COMPRESS",
+            "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+            "name": "",
+            "partitioningAttribute": "",
+            "prioritizers": [],
+            "selectedRelationships": [
+              ""
+            ],
+            "source": {
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+              "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+              "name": "in",
+              "type": "INPUT_PORT"
+            },
+            "zIndex": 0
+          }
+        ],
+        "controllerServices": [],
+        "defaultBackPressureDataSizeThreshold": "1 GB",
+        "defaultBackPressureObjectThreshold": 10000,
+        "defaultFlowFileExpiration": "0 sec",
+        "flowFileConcurrency": "UNBOUNDED",
+        "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+        "funnels": [],
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+        "inputPorts": [
+          {
+            "allowRemoteAccess": false,
+            "componentType": "INPUT_PORT",
+            "concurrentlySchedulableTaskCount": 1,
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+            "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+            "name": "in",
+            "position": {
+              "x": 784.0,
+              "y": 146.0
+            },
+            "scheduledState": "ENABLED",
+            "type": "INPUT_PORT"
+          }
+        ],
+        "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+        "labels": [],
+        "name": "child",
+        "outputPorts": [],
+        "position": {
+          "x": 824.0,
+          "y": 648.0
+        },
+        "processGroups": [],
+        "processors": [
+          {
+            "autoTerminatedRelationships": [],
+            "backoffMechanism": "PENALIZE_FLOWFILE",
+            "bulletinLevel": "WARN",
+            "bundle": {
+              "artifact": "nifi-update-attribute-nar",
+              "group": "org.apache.nifi",
+              "version": "1.16.0.2.1.4.0-53"
+            },
+            "comments": "",
+            "componentType": "PROCESSOR",
+            "concurrentlySchedulableTaskCount": 1,
+            "executionNode": "ALL",
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+            "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+            "maxBackoffPeriod": "10 mins",
+            "name": "UpdateAttribute",
+            "penaltyDuration": "30 sec",
+            "position": {
+              "x": 735.0,
+              "y": 310.0
+            },
+            "properties": {
+              "Store State": "Do not store state",
+              "canonical-value-lookup-cache-size": "100"
+            },
+            "propertyDescriptors": {
+              "Delete Attributes Expression": {
+                "displayName": "Delete Attributes Expression",
+                "identifiesControllerService": false,
+                "name": "Delete Attributes Expression",
+                "sensitive": false
+              },
+              "Store State": {
+                "displayName": "Store State",
+                "identifiesControllerService": false,
+                "name": "Store State",
+                "sensitive": false
+              },
+              "canonical-value-lookup-cache-size": {
+                "displayName": "Cache Value Lookup Cache Size",
+                "identifiesControllerService": false,
+                "name": "canonical-value-lookup-cache-size",
+                "sensitive": false
+              },
+              "Stateful Variables Initial Value": {
+                "displayName": "Stateful Variables Initial Value",
+                "identifiesControllerService": false,
+                "name": "Stateful Variables Initial Value",
+                "sensitive": false
+              }
+            },
+            "retriedRelationships": [],
+            "retryCount": 10,
+            "runDurationMillis": 0,
+            "scheduledState": "ENABLED",
+            "schedulingPeriod": "0 sec",
+            "schedulingStrategy": "TIMER_DRIVEN",
+            "style": {},
+            "type": "org.apache.nifi.processors.attributes.UpdateAttribute",
+            "yieldDuration": "1 sec"
+          }
+        ],
+        "remoteProcessGroups": [],
+        "variables": {}
+      }
+    ],
+    "processors": [
+      {
+        "autoTerminatedRelationships": [],
+        "backoffMechanism": "PENALIZE_FLOWFILE",
+        "bulletinLevel": "WARN",
+        "bundle": {
+          "artifact": "nifi-standard-nar",
+          "group": "org.apache.nifi",
+          "version": "1.16.0.2.1.4.0-53"
+        },
+        "comments": "",
+        "componentType": "PROCESSOR",
+        "concurrentlySchedulableTaskCount": 1,
+        "executionNode": "ALL",
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+        "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+        "maxBackoffPeriod": "10 mins",
+        "name": "ReplaceText",
+        "penaltyDuration": "30 sec",
+        "position": {
+          "x": 848.0,
+          "y": 384.0
+        },
+        "properties": {
+          "Regular Expression": "(?s)(^.*$)",
+          "Replacement Value": "$1",
+          "Evaluation Mode": "Line-by-Line",
+          "Line-by-Line Evaluation Mode": "All",
+          "Character Set": "UTF-8",
+          "Maximum Buffer Size": "1 MB",
+          "Replacement Strategy": "Regex Replace"
+        },
+        "propertyDescriptors": {
+          "Regular Expression": {
+            "displayName": "Search Value",
+            "identifiesControllerService": false,
+            "name": "Regular Expression",
+            "sensitive": false
+          },
+          "Replacement Value": {
+            "displayName": "Replacement Value",
+            "identifiesControllerService": false,
+            "name": "Replacement Value",
+            "sensitive": false
+          },
+          "Evaluation Mode": {
+            "displayName": "Evaluation Mode",
+            "identifiesControllerService": false,
+            "name": "Evaluation Mode",
+            "sensitive": false
+          },
+          "Line-by-Line Evaluation Mode": {
+            "displayName": "Line-by-Line Evaluation Mode",
+            "identifiesControllerService": false,
+            "name": "Line-by-Line Evaluation Mode",
+            "sensitive": false
+          },
+          "Character Set": {
+            "displayName": "Character Set",
+            "identifiesControllerService": false,
+            "name": "Character Set",
+            "sensitive": false
+          },
+          "Maximum Buffer Size": {
+            "displayName": "Maximum Buffer Size",
+            "identifiesControllerService": false,
+            "name": "Maximum Buffer Size",
+            "sensitive": false
+          },
+          "Replacement Strategy": {
+            "displayName": "Replacement Strategy",
+            "identifiesControllerService": false,
+            "name": "Replacement Strategy",
+            "sensitive": false
+          }
+        },
+        "retriedRelationships": [],
+        "retryCount": 10,
+        "runDurationMillis": 0,
+        "scheduledState": "ENABLED",
+        "schedulingPeriod": "0 sec",
+        "schedulingStrategy": "TIMER_DRIVEN",
+        "style": {},
+        "type": "org.apache.nifi.processors.standard.ReplaceText",
+        "yieldDuration": "1 sec"
+      }
+    ],
+    "remoteProcessGroups": [],
+    "variables": {}
+  },
+  "flowEncodingVersion": "1.0",
+  "parameterContexts": {},
+  "snapshotMetadata": {
+    "author": "anonymous",
+    "comments": "",
+    "timestamp": 1676577758403,
+    "version": 1
+  }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json
new file mode 100644
index 0000000000..d68c67c90e
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json
@@ -0,0 +1,156 @@
+{
+  "externalControllerServices": {},
+  "flowContents": {
+    "comments": "",
+    "componentType": "PROCESS_GROUP",
+    "connections": [
+      {
+        "backPressureDataSizeThreshold": "1 GB",
+        "backPressureObjectThreshold": 10000,
+        "bends": [],
+        "componentType": "CONNECTION",
+        "destination": {
+          "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+          "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+          "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+          "name": "in",
+          "type": "INPUT_PORT"
+        },
+        "flowFileExpiration": "0 sec",
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+        "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+        "labelIndex": 1,
+        "loadBalanceCompression": "DO_NOT_COMPRESS",
+        "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+        "name": "",
+        "partitioningAttribute": "",
+        "prioritizers": [],
+        "selectedRelationships": [
+          "failure",
+          "success"
+        ],
+        "source": {
+          "comments": "",
+          "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+          "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+          "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+          "name": "ReplaceText",
+          "type": "PROCESSOR"
+        },
+        "zIndex": 0
+      }
+    ],
+    "controllerServices": [],
+    "defaultBackPressureDataSizeThreshold": "1 GB",
+    "defaultBackPressureObjectThreshold": 10000,
+    "defaultFlowFileExpiration": "0 sec",
+    "flowFileConcurrency": "UNBOUNDED",
+    "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+    "funnels": [],
+    "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+    "inputPorts": [],
+    "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+    "labels": [],
+    "name": "parent",
+    "outputPorts": [],
+    "position": {
+      "x": 614.0,
+      "y": 462.0
+    },
+    "processGroups": [
+      {
+        "comments": "",
+        "componentType": "PROCESS_GROUP",
+        "connections": [
+          {
+            "backPressureDataSizeThreshold": "1 GB",
+            "backPressureObjectThreshold": 10000,
+            "bends": [],
+            "componentType": "CONNECTION",
+            "destination": {
+              "comments": "",
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+              "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+              "name": "UpdateAttribute",
+              "type": "PROCESSOR"
+            },
+            "flowFileExpiration": "0 sec",
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+            "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+            "labelIndex": 1,
+            "loadBalanceCompression": "DO_NOT_COMPRESS",
+            "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+            "name": "",
+            "partitioningAttribute": "",
+            "prioritizers": [],
+            "selectedRelationships": [
+              ""
+            ],
+            "source": {
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+              "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+              "name": "in",
+              "type": "INPUT_PORT"
+            },
+            "zIndex": 0
+          }
+        ],
+        "controllerServices": [],
+        "defaultBackPressureDataSizeThreshold": "1 GB",
+        "defaultBackPressureObjectThreshold": 10000,
+        "defaultFlowFileExpiration": "0 sec",
+        "flowFileConcurrency": "UNBOUNDED",
+        "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+        "funnels": [],
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+        "inputPorts": [
+          {
+            "allowRemoteAccess": false,
+            "componentType": "INPUT_PORT",
+            "concurrentlySchedulableTaskCount": 1,
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+            "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+            "name": "in",
+            "position": {
+              "x": 784.0,
+              "y": 146.0
+            },
+            "scheduledState": "ENABLED",
+            "type": "INPUT_PORT"
+          }
+        ],
+        "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+        "labels": [],
+        "name": "child",
+        "outputPorts": [],
+        "position": {
+          "x": 824.0,
+          "y": 648.0
+        },
+        "processGroups": [],
+        "processors": [
+        ],
+        "remoteProcessGroups": [],
+        "variables": {}
+      }
+    ],
+    "processors": [
+    ],
+    "remoteProcessGroups": [],
+    "variables": {}
+  },
+  "flowEncodingVersion": "1.0",
+  "parameterContexts": {},
+  "snapshotMetadata": {
+    "author": "anonymous",
+    "comments": "",
+    "timestamp": 1676577758403,
+    "version": 1
+  }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json
new file mode 100644
index 0000000000..4c2d4f0227
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json
@@ -0,0 +1,305 @@
+{
+  "externalControllerServices": {},
+  "flowContents": {
+    "comments": "",
+    "componentType": "PROCESS_GROUP",
+    "connections": [
+      {
+        "backPressureDataSizeThreshold": "1 GB",
+        "backPressureObjectThreshold": 10000,
+        "bends": [],
+        "componentType": "CONNECTION",
+        "destination": {
+          "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+          "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+          "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+          "name": "in",
+          "type": "INPUT_PORT"
+        },
+        "flowFileExpiration": "0 sec",
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+        "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+        "labelIndex": 1,
+        "loadBalanceCompression": "DO_NOT_COMPRESS",
+        "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+        "name": "",
+        "partitioningAttribute": "",
+        "prioritizers": [],
+        "selectedRelationships": [
+          "failure",
+          "success"
+        ],
+        "source": {
+          "comments": "",
+          "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+          "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+          "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+          "name": "ReplaceText",
+          "type": "PROCESSOR"
+        },
+        "zIndex": 0
+      }
+    ],
+    "controllerServices": [],
+    "defaultBackPressureDataSizeThreshold": "1 GB",
+    "defaultBackPressureObjectThreshold": 10000,
+    "defaultFlowFileExpiration": "0 sec",
+    "flowFileConcurrency": "UNBOUNDED",
+    "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+    "funnels": [],
+    "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+    "inputPorts": [],
+    "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+    "labels": [],
+    "name": "parent",
+    "outputPorts": [],
+    "position": {
+      "x": 614.0,
+      "y": 462.0
+    },
+    "processGroups": [
+      {
+        "comments": "",
+        "componentType": "PROCESS_GROUP",
+        "connections": [
+          {
+            "backPressureDataSizeThreshold": "1 GB",
+            "backPressureObjectThreshold": 10000,
+            "bends": [],
+            "componentType": "CONNECTION",
+            "destination": {
+              "comments": "",
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+              "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+              "name": "UpdateAttribute",
+              "type": "PROCESSOR"
+            },
+            "flowFileExpiration": "0 sec",
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+            "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+            "labelIndex": 1,
+            "loadBalanceCompression": "DO_NOT_COMPRESS",
+            "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+            "name": "",
+            "partitioningAttribute": "",
+            "prioritizers": [],
+            "selectedRelationships": [
+              ""
+            ],
+            "source": {
+              "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+              "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+              "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+              "name": "in",
+              "type": "INPUT_PORT"
+            },
+            "zIndex": 0
+          }
+        ],
+        "controllerServices": [],
+        "defaultBackPressureDataSizeThreshold": "1 GB",
+        "defaultBackPressureObjectThreshold": 10000,
+        "defaultFlowFileExpiration": "0 sec",
+        "flowFileConcurrency": "UNBOUNDED",
+        "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+        "funnels": [],
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+        "inputPorts": [
+          {
+            "allowRemoteAccess": false,
+            "componentType": "INPUT_PORT",
+            "concurrentlySchedulableTaskCount": 1,
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+            "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+            "name": "in",
+            "position": {
+              "x": 784.0,
+              "y": 146.0
+            },
+            "scheduledState": "ENABLED",
+            "type": "INPUT_PORT"
+          }
+        ],
+        "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+        "labels": [],
+        "name": "child",
+        "outputPorts": [],
+        "position": {
+          "x": 824.0,
+          "y": 648.0
+        },
+        "processGroups": [],
+        "processors": [
+          {
+            "autoTerminatedRelationships": [],
+            "backoffMechanism": "PENALIZE_FLOWFILE",
+            "bulletinLevel": "WARN",
+            "bundle": {
+              "artifact": "nifi-update-attribute-nar",
+              "group": "org.apache.nifi",
+              "version": "1.16.0.2.1.4.0-53"
+            },
+            "comments": "",
+            "componentType": "PROCESSOR",
+            "concurrentlySchedulableTaskCount": 1,
+            "executionNode": "ALL",
+            "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+            "identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+            "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+            "maxBackoffPeriod": "10 mins",
+            "name": "UpdateAttribute",
+            "penaltyDuration": "30 sec",
+            "position": {
+              "x": 735.0,
+              "y": 310.0
+            },
+            "properties": {
+              "Store State": "Do not store state",
+              "canonical-value-lookup-cache-size": "100"
+            },
+            "propertyDescriptors": {
+              "Delete Attributes Expression": {
+                "displayName": "Delete Attributes Expression",
+                "identifiesControllerService": false,
+                "name": "Delete Attributes Expression",
+                "sensitive": false
+              },
+              "Store State": {
+                "displayName": "Store State",
+                "identifiesControllerService": false,
+                "name": "Store State",
+                "sensitive": false
+              },
+              "canonical-value-lookup-cache-size": {
+                "displayName": "Cache Value Lookup Cache Size",
+                "identifiesControllerService": false,
+                "name": "canonical-value-lookup-cache-size",
+                "sensitive": false
+              },
+              "Stateful Variables Initial Value": {
+                "displayName": "Stateful Variables Initial Value",
+                "identifiesControllerService": false,
+                "name": "Stateful Variables Initial Value",
+                "sensitive": false
+              }
+            },
+            "retriedRelationships": [],
+            "retryCount": 10,
+            "runDurationMillis": 0,
+            "scheduledState": "ENABLED",
+            "schedulingPeriod": "0 sec",
+            "schedulingStrategy": "TIMER_DRIVEN",
+            "style": {},
+            "type": "org.apache.nifi.processors.attributes.UpdateAttribute",
+            "yieldDuration": "1 sec"
+          }
+        ],
+        "remoteProcessGroups": [],
+        "variables": {}
+      }
+    ],
+    "processors": [
+      {
+        "autoTerminatedRelationships": [],
+        "backoffMechanism": "PENALIZE_FLOWFILE",
+        "bulletinLevel": "WARN",
+        "bundle": {
+          "artifact": "nifi-standard-nar",
+          "group": "org.apache.nifi",
+          "version": "1.16.0.2.1.4.0-53"
+        },
+        "comments": "",
+        "componentType": "PROCESSOR",
+        "concurrentlySchedulableTaskCount": 1,
+        "executionNode": "ALL",
+        "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+        "identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+        "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+        "maxBackoffPeriod": "10 mins",
+        "name": "ReplaceText",
+        "penaltyDuration": "30 sec",
+        "position": {
+          "x": 848.0,
+          "y": 384.0
+        },
+        "properties": {
+          "Regular Expression": "(?s)(^.*$)",
+          "Replacement Value": "$1",
+          "Evaluation Mode": "Line-by-Line",
+          "Line-by-Line Evaluation Mode": "All",
+          "Character Set": "UTF-8",
+          "Maximum Buffer Size": "1 MB",
+          "Replacement Strategy": "Regex Replace"
+        },
+        "propertyDescriptors": {
+          "Regular Expression": {
+            "displayName": "Search Value",
+            "identifiesControllerService": false,
+            "name": "Regular Expression",
+            "sensitive": false
+          },
+          "Replacement Value": {
+            "displayName": "Replacement Value",
+            "identifiesControllerService": false,
+            "name": "Replacement Value",
+            "sensitive": false
+          },
+          "Evaluation Mode": {
+            "displayName": "Evaluation Mode",
+            "identifiesControllerService": false,
+            "name": "Evaluation Mode",
+            "sensitive": false
+          },
+          "Line-by-Line Evaluation Mode": {
+            "displayName": "Line-by-Line Evaluation Mode",
+            "identifiesControllerService": false,
+            "name": "Line-by-Line Evaluation Mode",
+            "sensitive": false
+          },
+          "Character Set": {
+            "displayName": "Character Set",
+            "identifiesControllerService": false,
+            "name": "Character Set",
+            "sensitive": false
+          },
+          "Maximum Buffer Size": {
+            "displayName": "Maximum Buffer Size",
+            "identifiesControllerService": false,
+            "name": "Maximum Buffer Size",
+            "sensitive": false
+          },
+          "Replacement Strategy": {
+            "displayName": "Replacement Strategy",
+            "identifiesControllerService": false,
+            "name": "Replacement Strategy",
+            "sensitive": false
+          }
+        },
+        "retriedRelationships": [],
+        "retryCount": 10,
+        "runDurationMillis": 0,
+        "scheduledState": "ENABLED",
+        "schedulingPeriod": "0 sec",
+        "schedulingStrategy": "TIMER_DRIVEN",
+        "style": {},
+        "type": "org.apache.nifi.processors.standard.ReplaceText",
+        "yieldDuration": "1 sec"
+      }
+    ],
+    "remoteProcessGroups": [],
+    "variables": {}
+  },
+  "flowEncodingVersion": "1.0",
+  "parameterContexts": {},
+  "snapshotMetadata": {
+    "author": "anonymous",
+    "comments": "",
+    "timestamp": 1676577758403,
+    "version": 1
+  }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json
new file mode 100644
index 0000000000..bc80a0a74b
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json
@@ -0,0 +1 @@
+{"externalControllerServices":{},"flowContents":{"comments":"","componentType":"PROCESS_GROUP","connections":[{"backPressureDataSizeThreshold":"1 GB","backPressureObjectThreshold":10000,"bends":[],"componentType":"CONNECTION","destination":{"comments":"","groupId":"30330c01-185a-3217-b764-39aef8d2a05f","id":"941ef216-74ef-3225-bf73-a9e8b167c0a8","instanceIdentifier":"5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b","name":"UpdateAttribute","type":"PROCESSOR"},"flowFileExpiration":"0 sec","groupIden [...]
\ No newline at end of file
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
index 5d954e3f0b..edbf08fe99 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@@ -70,4 +71,5 @@ public interface ProcessGroupClient {
     FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnippetRequestEntity)
         throws NiFiClientException, IOException;
 
+    FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException;
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
index 5bf9b5fccc..4f2ad9e290 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
@@ -23,6 +23,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@@ -333,4 +334,19 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr
         });
     }
 
+    @Override
+    public FlowComparisonEntity getLocalModifications(final String processGroupId) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(processGroupId)) {
+            throw new IllegalArgumentException("Process group id cannot be null or blank");
+        }
+
+        return executeAction("Error retrieving list of local flow modifications", () -> {
+            final WebTarget target = processGroupsTarget
+                .path("{id}/local-modifications")
+                .resolveTemplate("id", processGroupId);
+
+            return getRequestBuilder(target).get(FlowComparisonEntity.class);
+        });
+    }
+
 }