You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:06 UTC

[18/50] nifi git commit: NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we sto

NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we stop waiting for that to occur. Also ensure that if we fail to update flow that we re-enable/restart the processors and services - Updated verbiage to use a ConciseEvolvingDifferentDescriptor when getting local modifications for a versioned flow - Do not allow outer process group to be saved to flow registry or have local modifications reverted if it has a descendant process group that is under version control and is dirty. Fixed bug where ComponentDifferenceDTO was populated with wrong component id and group id

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/adacb204
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/adacb204
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/adacb204

Branch: refs/heads/master
Commit: adacb204a8b6518a79600253463a36c9f8afaa37
Parents: 3d8b1e4
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Nov 17 11:02:33 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:53 2018 -0500

----------------------------------------------------------------------
 .../api/entity/VersionedFlowSnapshotEntity.java |  11 +
 .../org/apache/nifi/groups/ProcessGroup.java    |  27 +-
 .../apache/nifi/registry/flow/FlowRegistry.java |   9 +-
 .../apache/nifi/controller/FlowController.java  |  54 +++-
 .../nifi/groups/StandardProcessGroup.java       | 143 ++++++++--
 .../registry/flow/RestBasedFlowRegistry.java    |  17 +-
 .../service/mock/MockProcessGroup.java          |  14 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  34 ++-
 .../nifi/web/StandardNiFiServiceFacade.java     |  93 ++-----
 .../nifi/web/api/ProcessGroupResource.java      |  11 +-
 .../apache/nifi/web/api/VersionsResource.java   | 273 +++++++++----------
 .../api/concurrent/AsynchronousWebRequest.java  |   8 +
 .../StandardAsynchronousWebRequest.java         |   7 +
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  15 +-
 .../apache/nifi/web/dao/ProcessGroupDAO.java    |   4 +-
 .../web/dao/impl/StandardProcessGroupDAO.java   |  10 +-
 .../nifi/web/util/CancellableTimedPause.java    |   2 +-
 .../org/apache/nifi/web/util/SnippetUtils.java  |  16 +-
 18 files changed, 490 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java
index 170640d..2faf791 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java
@@ -28,6 +28,7 @@ public class VersionedFlowSnapshotEntity extends Entity {
     private VersionedFlowSnapshot versionedFlowSnapshot;
     private RevisionDTO processGroupRevision;
     private String registryId;
+    private Boolean updateDescendantVersionedFlows;
 
     @ApiModelProperty("The versioned flow snapshot")
     public VersionedFlowSnapshot getVersionedFlowSnapshot() {
@@ -55,4 +56,14 @@ public class VersionedFlowSnapshotEntity extends Entity {
     public void setRegistryId(String registryId) {
         this.registryId = registryId;
     }
+
+    @ApiModelProperty("If the Process Group to be updated has a child or descendant Process Group that is also under "
+        + "Version Control, this specifies whether or not the contents of that child/descendant Process Group should be updated.")
+    public Boolean getUpdateDescendantVersionedFlows() {
+        return updateDescendantVersionedFlows;
+    }
+
+    public void setUpdateDescendantVersionedFlows(Boolean update) {
+        this.updateDescendantVersionedFlows = update;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 16b4b5e..d81b7d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -784,8 +784,10 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
      *            and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
      *            throw an IllegalStateException
      * @param updateSettings whether or not to update the process group's name and positions
+     * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
+     *            update the contents of that Process Group
      */
-    void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
+    void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
 
     /**
      * Verifies a template with the specified name can be created.
@@ -848,7 +850,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
     void verifyCanUpdateVariables(Map<String, String> updatedVariables);
 
     /**
-     * Ensure that the contents of the Process Group can be update to match the given new flow
+     * Ensures that the contents of the Process Group can be update to match the given new flow
      *
      * @param updatedFlow the updated version of the flow
      * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed
@@ -860,6 +862,27 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
     void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
 
     /**
+     * Ensures that the Process Group can have any local changes reverted
+     *
+     * @throws IllegalStateException if the Process Group is not in a state that will allow local changes to be reverted
+     */
+    void verifyCanRevertLocalModifications();
+
+    /**
+     * Ensures that the Process Group can have its local modifications shown
+     *
+     * @throws IllegalStateException if the Process Group is not in a state that will allow local modifications to be shown
+     */
+    void verifyCanShowLocalModifications();
+
+    /**
+     * Ensure that the contents of the Process Group can be saved to a Flow Registry in its current state
+     *
+     * @throws IllegalStateException if the Process Group cannot currently be saved to a Flow Registry
+     */
+    void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId);
+
+    /**
      * Adds the given template to this Process Group
      *
      * @param template the template to add

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 76f96f2..ae43bb5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -159,6 +159,9 @@ public interface FlowRegistry {
      * @param bucketId the ID of the bucket
      * @param flowId the ID of the flow
      * @param version the version to retrieve
+     * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not
+     *            the child's contents should be fetched.
+     * @param user the user on whose behalf the flow contents are being retrieved
      * @return the contents of the Flow from the Flow Registry
      *
      * @throws IOException if unable to communicate with the Flow Registry
@@ -167,7 +170,7 @@ public interface FlowRegistry {
      * @throws NullPointerException if any of the arguments is not specified
      * @throws IllegalArgumentException if the given version is less than 1
      */
-    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException;
+    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows, NiFiUser user) throws IOException, NiFiRegistryException;
 
     /**
      * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
@@ -175,6 +178,8 @@ public interface FlowRegistry {
      * @param bucketId the ID of the bucket
      * @param flowId the ID of the flow
      * @param version the version to retrieve
+     * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not
+     *            the child's contents should be fetched.
      * @return the contents of the Flow from the Flow Registry
      *
      * @throws IOException if unable to communicate with the Flow Registry
@@ -183,7 +188,7 @@ public interface FlowRegistry {
      * @throws NullPointerException if any of the arguments is not specified
      * @throws IllegalArgumentException if the given version is less than 1
      */
-    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException;
+    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws IOException, NiFiRegistryException;
 
     /**
      * Retrieves a VersionedFlow by bucket id and flow id

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5ed5b6e..3909387 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -165,8 +165,11 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -1775,6 +1778,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * processor
      */
     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
+        instantiateSnippet(group, dto, true);
+    }
+
+    private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {
         writeLock.lock();
         try {
             validateSnippetContents(requireNonNull(group), dto);
@@ -1789,6 +1796,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
                 serviceNode.setComments(controllerServiceDTO.getComments());
                 serviceNode.setName(controllerServiceDTO.getName());
+                if (!topLevel) {
+                    serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
+                }
 
                 group.addControllerService(serviceNode);
             }
@@ -1812,6 +1822,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 }
 
                 label.setStyle(labelDTO.getStyle());
+                if (!topLevel) {
+                    label.setVersionedComponentId(labelDTO.getVersionedComponentId());
+                }
+
                 group.addLabel(label);
             }
 
@@ -1819,6 +1833,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             for (final FunnelDTO funnelDTO : dto.getFunnels()) {
                 final Funnel funnel = createFunnel(funnelDTO.getId());
                 funnel.setPosition(toPosition(funnelDTO.getPosition()));
+                if (!topLevel) {
+                    funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
+                }
+
                 group.addFunnel(funnel);
             }
 
@@ -1840,6 +1858,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
                 }
 
+                if (!topLevel) {
+                    inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
+                }
                 inputPort.setPosition(toPosition(portDTO.getPosition()));
                 inputPort.setProcessGroup(group);
                 inputPort.setComments(portDTO.getComments());
@@ -1861,6 +1882,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
                 }
 
+                if (!topLevel) {
+                    outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
+                }
                 outputPort.setPosition(toPosition(portDTO.getPosition()));
                 outputPort.setProcessGroup(group);
                 outputPort.setComments(portDTO.getComments());
@@ -1876,6 +1900,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
                 procNode.setPosition(toPosition(processorDTO.getPosition()));
                 procNode.setProcessGroup(group);
+                if (!topLevel) {
+                    procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
+                }
 
                 final ProcessorConfigDTO config = processorDTO.getConfig();
                 procNode.setComments(config.getComments());
@@ -1936,6 +1963,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
                 remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
                 remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
+                if (!topLevel) {
+                    remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
+                }
+
                 if (remoteGroupDTO.getTransportProtocol() == null) {
                     remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
                 } else {
@@ -1979,6 +2010,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     childGroup.setVariables(groupDTO.getVariables());
                 }
 
+                // If this Process Group is 'top level' then we do not set versioned component ID's.
+                // We do this only if this component is the child of a Versioned Component.
+                if (!topLevel) {
+                    childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
+                }
+
                 group.addProcessGroup(childGroup);
 
                 final FlowSnippetDTO contents = groupDTO.getContents();
@@ -1995,7 +2032,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 childTemplateDTO.setFunnels(contents.getFunnels());
                 childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
                 childTemplateDTO.setControllerServices(contents.getControllerServices());
-                instantiateSnippet(childGroup, childTemplateDTO);
+                instantiateSnippet(childGroup, childTemplateDTO, false);
+
+                if (groupDTO.getVersionControlInformation() != null) {
+                    final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper();
+                    final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false);
+
+                    final VersionControlInformation vci = StandardVersionControlInformation.Builder
+                        .fromDto(groupDTO.getVersionControlInformation())
+                        .flowSnapshot(versionedGroup)
+                        .build();
+                    childGroup.setVersionControlInformation(vci, Collections.emptyMap());
+                }
             }
 
             //
@@ -2039,6 +2087,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 }
 
                 final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
+                if (!topLevel) {
+                    connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
+                }
 
                 if (connectionDTO.getBends() != null) {
                     final List<Position> bendPoints = new ArrayList<>();
@@ -2088,6 +2139,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             for (final RemoteProcessGroupPortDTO port : ports) {
                 final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
                 descriptor.setId(port.getId());
+                descriptor.setVersionedComponentId(port.getVersionedComponentId());
                 descriptor.setTargetId(port.getTargetId());
                 descriptor.setName(port.getName());
                 descriptor.setComments(port.getComments());

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index d1aa4e2..51839d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -2821,7 +2821,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             versionControlInformation.getBucketIdentifier(),
             versionControlInformation.getFlowIdentifier(),
             versionControlInformation.getVersion(),
-            versionControlInformation.getFlowSnapshot(),
+            stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
             versionControlInformation.isModified(),
             versionControlInformation.isCurrent()) {
 
@@ -2849,6 +2849,51 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final VersionedProcessGroup processGroup, final boolean topLevel) {
+        if (processGroup == null) {
+            return null;
+        }
+
+        final VersionedProcessGroup copy = new VersionedProcessGroup();
+        copy.setComments(processGroup.getComments());
+        copy.setComponentType(processGroup.getComponentType());
+        copy.setGroupIdentifier(processGroup.getGroupIdentifier());
+        copy.setIdentifier(processGroup.getIdentifier());
+        copy.setName(processGroup.getName());
+        copy.setPosition(processGroup.getPosition());
+        copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates());
+        copy.setConnections(processGroup.getConnections());
+        copy.setControllerServices(processGroup.getControllerServices());
+        copy.setFunnels(processGroup.getFunnels());
+        copy.setInputPorts(processGroup.getInputPorts());
+        copy.setOutputPorts(processGroup.getOutputPorts());
+        copy.setProcessors(processGroup.getProcessors());
+        copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
+        copy.setVariables(processGroup.getVariables());
+
+        final Set<VersionedProcessGroup> copyChildren = new HashSet<>();
+
+        for (final VersionedProcessGroup childGroup : processGroup.getProcessGroups()) {
+            if (childGroup.getVersionedFlowCoordinates() == null) {
+                copyChildren.add(stripContentsFromRemoteDescendantGroups(childGroup, false));
+            } else {
+                final VersionedProcessGroup childCopy = new VersionedProcessGroup();
+                childCopy.setComments(childGroup.getComments());
+                childCopy.setComponentType(childGroup.getComponentType());
+                childCopy.setGroupIdentifier(childGroup.getGroupIdentifier());
+                childCopy.setIdentifier(childGroup.getIdentifier());
+                childCopy.setName(childGroup.getName());
+                childCopy.setPosition(childGroup.getPosition());
+                childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates());
+
+                copyChildren.add(childCopy);
+            }
+        }
+
+        copy.setProcessGroups(copyChildren);
+        return copy;
+    }
+
     @Override
     public void disconnectVersionControl() {
         writeLock.lock();
@@ -2900,7 +2945,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             });
 
         processGroup.getProcessGroups().stream()
-            .filter(childGroup -> childGroup.getVersionControlInformation() != null)
+            .filter(childGroup -> childGroup.getVersionControlInformation() == null)
             .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
     }
 
@@ -2925,7 +2970,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
             // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
             try {
-                final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion());
+                final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false);
                 final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
                 vci.setFlowSnapshot(registryFlow);
             } catch (final IOException | NiFiRegistryException e) {
@@ -2958,7 +3003,8 @@ public final class StandardProcessGroup implements ProcessGroup {
 
 
     @Override
-    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
+    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
+            final boolean updateDescendantVersionedFlows) {
         writeLock.lock();
         try {
             verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
@@ -2986,7 +3032,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             final Set<String> knownVariables = getKnownVariableNames();
-            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables);
+            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables);
         } catch (final ProcessorInstantiationException pie) {
             throw new RuntimeException(pie);
         } finally {
@@ -3013,7 +3059,8 @@ public final class StandardProcessGroup implements ProcessGroup {
 
 
     private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
-        final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set<String> variablesToSkip) throws ProcessorInstantiationException {
+        final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
+        final Set<String> variablesToSkip) throws ProcessorInstantiationException {
 
         group.setComments(proposed.getComments());
 
@@ -3033,14 +3080,8 @@ public final class StandardProcessGroup implements ProcessGroup {
             .map(VariableDescriptor::getName)
             .collect(Collectors.toSet());
 
-        final Set<String> variablesRemoved = new HashSet<>(existingVariableNames);
-
-        if (proposed.getVariables() != null) {
-            variablesRemoved.removeAll(proposed.getVariables().keySet());
-        }
 
         final Map<String, String> updatedVariableMap = new HashMap<>();
-        variablesRemoved.forEach(var -> updatedVariableMap.put(var, null));
 
         // If any new variables exist in the proposed flow, add those to the variable registry.
         for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
@@ -3069,6 +3110,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .flowId(flowId)
                 .flowName(flowId) // flow id not yet known
                 .version(version)
+                .flowSnapshot(proposed)
                 .modified(false)
                 .current(true)
                 .build();
@@ -3084,11 +3126,13 @@ public final class StandardProcessGroup implements ProcessGroup {
         for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
             final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
 
+            final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
+
             if (childGroup == null) {
                 final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
                 LOG.info("Added {} to {}", added, this);
-            } else {
-                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip);
+            } else if (childCoordinates == null || updateDescendantVersionedGroups) {
+                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip);
                 LOG.info("Updated {}", childGroup);
             }
 
@@ -3367,7 +3411,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
         group.setVersionedComponentId(proposed.getIdentifier());
         group.setParent(destination);
-        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip);
+        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
         destination.addProcessGroup(group);
         return group;
     }
@@ -3739,7 +3783,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
 
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true);
+        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
 
         final ComparableDataFlow currentFlow = new ComparableDataFlow() {
             @Override
@@ -3765,7 +3809,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
         };
 
-        final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow, new EvolvingDifferenceDescriptor());
+        final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();
         final Set<FlowDifference> differences = comparison.getDifferences();
         final Set<FlowDifference> functionalDifferences = differences.stream()
@@ -4002,4 +4046,69 @@ public final class StandardProcessGroup implements ProcessGroup {
             findAllProcessGroups(child, map);
         }
     }
+
+    @Override
+    public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId) {
+        verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry");
+
+        final StandardVersionControlInformation vci = versionControlInfo.get();
+        if (vci != null) {
+            if (flowId != null && flowId.equals(vci.getFlowIdentifier())) {
+                // Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
+                // In order to do this, we have to ensure that the Process Group is 'current'.
+                final boolean current = vci.isCurrent();
+                if (!current) {
+                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+                        + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+                        + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
+                }
+
+                // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
+                // ensure that all other parameters match as well.
+                if (!bucketId.equals(vci.getBucketIdentifier())) {
+                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+                        + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+                }
+
+                if (!registryId.equals(vci.getRegistryIdentifier())) {
+                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+                        + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+                }
+            } else if (flowId != null) {
+                // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
+                // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
+                // attempting to save a new version of a different flow. Saving a new version of a different Flow is
+                // not allowed because the Process Group must be in synch with the latest version of the flow before that
+                // can be done.
+                throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+                    + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+            }
+        }
+    }
+
+    @Override
+    public void verifyCanRevertLocalModifications() {
+        final StandardVersionControlInformation svci = versionControlInfo.get();
+        if (svci == null) {
+            throw new IllegalStateException("Cannot revert local modifications to Process Group because the Process Group is not under Version Control.");
+        }
+
+        verifyNoDescendantsWithLocalModifications("have its local modifications reverted");
+    }
+
+    @Override
+    public void verifyCanShowLocalModifications() {
+
+    }
+
+    private void verifyNoDescendantsWithLocalModifications(final String action) {
+        for (final ProcessGroup descendant : findAllProcessGroups()) {
+            final VersionControlInformation descendantVci = descendant.getVersionControlInformation();
+            if (descendantVci != null && descendantVci.isModified()) {
+                throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+                    + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+                    + "this action can be performed on the parent Process Group.");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 8bf89c6..1d3eec6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -178,21 +178,24 @@ public class RestBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException {
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user)
+            throws IOException, NiFiRegistryException {
         final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
         final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
 
-        final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
-        for (final VersionedProcessGroup child : contents.getProcessGroups()) {
-            populateVersionedContentsRecursively(child, user);
+        if (fetchRemoteFlows) {
+            final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
+            for (final VersionedProcessGroup child : contents.getProcessGroups()) {
+                populateVersionedContentsRecursively(child, user);
+            }
         }
 
         return flowSnapshot;
     }
 
     @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
-        return getFlowContents(bucketId, flowId, version, null);
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws IOException, NiFiRegistryException {
+        return getFlowContents(bucketId, flowId, version, fetchRemoteFlows, null);
     }
 
     private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException {
@@ -214,7 +217,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
             }
 
             final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
-            final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user);
+            final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, true, user);
             final VersionedProcessGroup contents = snapshot.getFlowContents();
 
             group.setComments(contents.getComments());

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index db4ac59..ef69906 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -650,11 +650,15 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
+    public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId) {
+    }
+
+    @Override
     public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) {
     }
 
     @Override
-    public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) {
+    public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
     }
 
     @Override
@@ -666,4 +670,12 @@ public class MockProcessGroup implements ProcessGroup {
     public void disconnectVersionControl() {
         this.versionControlInfo = null;
     }
+
+    @Override
+    public void verifyCanRevertLocalModifications() {
+    }
+
+    @Override
+    public void verifyCanShowLocalModifications() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 76cd2c4..02df16b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1368,11 +1368,13 @@ public interface NiFiServiceFacade {
      * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO
      *
      * @param versionControlInfo the coordinates of the versioned flow
+     * @param fetchRemoteFlows if the contents of Versioned Flow that is fetched contains a child/descendant Process Group
+     *            that is also under Version Control, this indicates whether that remote flow should also be fetched
      * @return the VersionedFlowSnapshot that corresponds to the given coordinates
      *
      * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
      */
-    VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
+    VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException;
 
     /**
      * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
@@ -1407,6 +1409,28 @@ public interface NiFiServiceFacade {
     void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
 
     /**
+     * Verifies that the Process Group with the given identifier can be saved to the flow registry
+     *
+     * @param groupId the ID of the Process Group
+     * @param registryId the ID of the Flow Registry
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     *
+     * @throws IllegalStateException if the Process Group cannot be saved to the flow registry with the coordinates specified
+     */
+    void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId);
+
+    /**
+     * Verifies that the Process Group with the given identifier can have its local modifications reverted to the given VersionedFlowSnapshot
+     *
+     * @param groupId the ID of the Process Group
+     * @param versionedFlowSnapshot the Versioned Flow Snapshot
+     *
+     * @throws IllegalStateException if the Process Group cannot have its local modifications reverted
+     */
+    void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot);
+
+    /**
      * Updates the Process group with the given ID to match the new snapshot
      *
      * @param revision the revision of the Process Group
@@ -1414,10 +1438,12 @@ public interface NiFiServiceFacade {
      * @param versionControlInfo the Version Control information
      * @param snapshot the new snapshot
      * @param componentIdSeed the seed to use for generating new component ID's
+     * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
+     *            update the contents of that Process Group
      * @return the Process Group
      */
     ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
-        boolean verifyNotModified);
+        boolean verifyNotModified, boolean updateDescendantVersionedFlows);
 
     /**
      * Updates the Process group with the given ID to match the new snapshot
@@ -1429,10 +1455,12 @@ public interface NiFiServiceFacade {
      * @param snapshot the new snapshot
      * @param componentIdSeed the seed to use for generating new component ID's
      * @param updateSettings whether or not the process group's name and position should be updated
+     * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
+     *            update the contents of that Process Group
      * @return the Process Group
      */
     ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
-        boolean verifyNotModified, boolean updateSettings);
+        boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
 
     // ----------------------------------------
     // Component state methods

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4d1bbbc..c66aebb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -97,13 +97,12 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedFlow;
-import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
-import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor;
 import org.apache.nifi.registry.flow.diff.FlowComparator;
 import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
@@ -3751,10 +3750,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         }
 
         final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
-            versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
+            versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser());
 
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-        final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
+        final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
         final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
 
         final ComparableDataFlow localFlow = new ComparableDataFlow() {
@@ -3781,7 +3780,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             }
         };
 
-        final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor());
+        final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor());
         final FlowComparison flowComparison = flowComparator.compare();
 
         final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
@@ -3853,6 +3852,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+        group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
+    }
+
+    @Override
+    public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+        group.verifyCanRevertLocalModifications();
+
+        // verify that the process group can be updated to the given snapshot. We do not verify that connections can
+        // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
+        // have been stopped.
+        group.verifyCanUpdate(versionedFlowSnapshot, false, false);
+    }
+
+    @Override
     public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
         final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
 
@@ -4028,7 +4044,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException {
+    public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException {
         final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
         if (flowRegistry == null) {
             throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
@@ -4036,15 +4052,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
         final VersionedFlowSnapshot snapshot;
         try {
-            snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
+            snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
         } catch (final NiFiRegistryException e) {
             throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
                 + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
         }
 
-        // If this Flow has a reference to a remote flow, we need to pull that remote flow as well
-        populateVersionedChildFlows(snapshot);
-
         return snapshot;
     }
 
@@ -4054,74 +4067,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
     }
 
-    private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
-        final VersionedProcessGroup group = snapshot.getFlowContents();
-
-        for (final VersionedProcessGroup child : group.getProcessGroups()) {
-            populateVersionedFlows(child);
-        }
-    }
-
-    private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
-        final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates();
-
-        if (remoteCoordinates != null) {
-            final String registryUrl = remoteCoordinates.getRegistryUrl();
-            final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
-            if (registryId == null) {
-                throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl
-                    + "], but no Flow Registry is currently registered for that URL.");
-            }
-
-            final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
-
-            final VersionedFlowSnapshot childSnapshot;
-            try {
-                childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser());
-            } catch (final NiFiRegistryException e) {
-                throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
-                    + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
-            }
-
-            final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents();
-            group.setComments(fetchedGroup.getComments());
-            group.setPosition(fetchedGroup.getPosition());
-            group.setName(fetchedGroup.getName());
-            group.setVariables(fetchedGroup.getVariables());
-
-            group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections()));
-            group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices()));
-            group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels()));
-            group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts()));
-            group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels()));
-            group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts()));
-            group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups()));
-            group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors()));
-            group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups()));
-        }
-
-        for (final VersionedProcessGroup child : group.getProcessGroups()) {
-            populateVersionedFlows(child);
-        }
-    }
-
-
     @Override
     public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
-        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
 
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
-        return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true);
+        return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows);
     }
 
     @Override
     public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
-        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
+        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
 
         final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
         final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
             processGroupNode,
-            () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings),
+            () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows),
             processGroup -> dtoFactory.createProcessGroupDto(processGroup));
 
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index de56a4f..7262a82 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1644,7 +1644,7 @@ public class ProcessGroupResource extends ApplicationResource {
         if (versionControlInfo != null) {
             // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
             // Step 2: Retrieve flow from Flow Registry
-            final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
+            final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
             final Bucket bucket = flowSnapshot.getBucket();
             final VersionedFlow flow = flowSnapshot.getFlow();
 
@@ -1653,6 +1653,8 @@ public class ProcessGroupResource extends ApplicationResource {
             versionControlInfo.setFlowDescription(flow.getDescription());
 
             versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
+            versionControlInfo.setModified(false);
+            versionControlInfo.setCurrent(flowSnapshot.isLatest());
 
             // Step 3: Resolve Bundle info
             BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
@@ -1709,8 +1711,13 @@ public class ProcessGroupResource extends ApplicationResource {
                         final RevisionDTO revisionDto = entity.getRevision();
                         final String newGroupId = entity.getComponent().getId();
                         final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
+
+                        // We don't want the Process Group's position to be updated because we want to keep the position where the user
+                        // placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents.
+                        // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position.
+                        flowSnapshot.getFlowContents().setPosition(null);
                         entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
-                            versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
+                        versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true);
                     }
 
                     populateRemainingProcessGroupEntityContent(entity);