You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:05 UTC

[17/50] nifi git commit: NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we sto

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 3684f04..f2a207e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,21 +17,6 @@
 
 package org.apache.nifi.web.api;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -92,10 +77,24 @@ import org.apache.nifi.web.util.AffectedComponentUtils;
 import org.apache.nifi.web.util.CancellableTimedPause;
 import org.apache.nifi.web.util.ComponentLifecycle;
 import org.apache.nifi.web.util.LifecycleManagementException;
-import org.apache.nifi.web.util.Pause;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -454,51 +453,15 @@ public class VersionsResource extends ApplicationResource {
                 super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
             },
             () -> {
-                final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId);
-                if (entity != null) {
-                    final String flowId = requestEntity.getVersionedFlow().getFlowId();
-                    if (flowId != null && flowId.equals(entity.getVersionControlInformation().getFlowId())) {
-                        // Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
-                        // In order to do this, we have to ensure that the Process Group is 'current'.
-                        final Boolean current = entity.getVersionControlInformation().getCurrent();
-                        if (current == null) {
-                            throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
-                                + " because it is not yet known whether or not this Process Group is the most recent version of the flow. "
-                                + "Please try the request again after the Process Group has been synchronized with the Flow Registry.");
-                        }
-
-                        if (current == Boolean.FALSE) {
-                            throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
-                                + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
-                                + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
-                        }
-
-                        // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
-                        // ensure that all other parameters match as well.
-                        if (!requestEntity.getVersionedFlow().getBucketId().equals(entity.getVersionControlInformation().getBucketId())) {
-                            throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
-                                + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
-                        }
-
-                        if (!requestEntity.getVersionedFlow().getRegistryId().equals(entity.getVersionControlInformation().getRegistryId())) {
-                            throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
-                                + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
-                        }
-
-                    } else if (flowId != null) {
-                        // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
-                        // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
-                        // attempting to save a new version of a different flow. Saving a new version of a different Flow is
-                        // not allowed because the Process Group must be in synch with the latest version of the flow before that
-                        // can be done.
-                        throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
-                            + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
-                    }
-                }
+                final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow();
+                final String registryId = versionedFlow.getRegistryId();
+                final String bucketId = versionedFlow.getBucketId();
+                final String flowId = versionedFlow.getFlowId();
+                serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId);
             },
             (rev, flowEntity) -> {
                 // Register the current flow with the Flow Registry.
-                final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
+                final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, flowEntity);
 
                 // Update the Process Group's Version Control Information
                 final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId,
@@ -756,7 +719,8 @@ public class VersionsResource extends ApplicationResource {
                 versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
                 versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
 
-                final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false);
+                final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
+                    entity.getUpdateDescendantVersionedFlows());
                 final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
 
                 final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
@@ -1039,7 +1003,7 @@ public class VersionsResource extends ApplicationResource {
         // 14. Re-Start all Processors, Funnels, Ports that are affected and not removed.
 
         // Step 0: Get the Versioned Flow Snapshot from the Flow Registry
-        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
+        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
 
         // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
         // the flow snapshot to contain compatible bundles.
@@ -1085,7 +1049,7 @@ public class VersionsResource extends ApplicationResource {
                 final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
                     try {
                         final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
-                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true);
+                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true);
 
                         vcur.markComplete(updatedVersionControlEntity);
                     } catch (final LifecycleManagementException e) {
@@ -1188,7 +1152,7 @@ public class VersionsResource extends ApplicationResource {
         final String idGenerationSeed = getIdGenerationSeed().orElse(null);
 
         // Step 0: Get the Versioned Flow Snapshot from the Flow Registry
-        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
+        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false);
 
         // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
         // the flow snapshot to contain compatible bundles.
@@ -1221,8 +1185,7 @@ public class VersionsResource extends ApplicationResource {
             () -> {
                 // Step 3: Verify that all components in the snapshot exist on all nodes
                 // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
-                // Step 5: Verify that Process Group is not 'dirty'
-                serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, false);
+                serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot);
             },
             (revision, processGroupEntity) -> {
                 // Ensure that the information passed in is correct
@@ -1254,7 +1217,7 @@ public class VersionsResource extends ApplicationResource {
                 final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
                     try {
                         final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
-                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false);
+                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false);
 
                         vcur.markComplete(updatedVersionControlEntity);
                     } catch (final LifecycleManagementException e) {
@@ -1288,7 +1251,7 @@ public class VersionsResource extends ApplicationResource {
     private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
         final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity,
         final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
-        final boolean verifyNotModified) throws LifecycleManagementException {
+        final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
 
         // Steps 6-7: Determine which components must be stopped and stop them.
         final Set<String> stoppableReferenceTypes = new HashSet<>();
@@ -1302,7 +1265,8 @@ public class VersionsResource extends ApplicationResource {
             .collect(Collectors.toSet());
 
         logger.info("Stopping {} Processors", runningComponents.size());
-        final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        asyncRequest.setCancelCallback(stopComponentsPause::cancel);
         componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause);
 
         if (asyncRequest.isCancelled()) {
@@ -1317,7 +1281,8 @@ public class VersionsResource extends ApplicationResource {
             .collect(Collectors.toSet());
 
         logger.info("Disabling {} Controller Services", enabledServices.size());
-        final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        asyncRequest.setCancelCallback(disableServicesPause::cancel);
         componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause);
 
         if (asyncRequest.isCancelled()) {
@@ -1328,96 +1293,113 @@ public class VersionsResource extends ApplicationResource {
         logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
         // If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
         // by replicating a PUT to /nifi-api/versions/process-groups/{groupId}
-        if (replicateRequest) {
+        try {
+            if (replicateRequest) {
 
-            final URI updateUri;
-            try {
-                updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
-                    exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment());
-            } catch (URISyntaxException e) {
-                throw new RuntimeException(e);
-            }
+                final URI updateUri;
+                try {
+                    updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
+                        exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment());
+                } catch (URISyntaxException e) {
+                    throw new RuntimeException(e);
+                }
 
-            final Map<String, String> headers = new HashMap<>();
-            headers.put("content-type", MediaType.APPLICATION_JSON);
+                final Map<String, String> headers = new HashMap<>();
+                headers.put("content-type", MediaType.APPLICATION_JSON);
 
-            final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
-            snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
-            snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
-            snapshotEntity.setVersionedFlow(flowSnapshot);
+                final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
+                snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
+                snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
+                snapshotEntity.setVersionedFlow(flowSnapshot);
+                snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows);
 
-            final NodeResponse clusterResponse;
-            try {
-                if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-                    clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
-                } else {
-                    clusterResponse = getRequestReplicator().forwardToCoordinator(
-                        getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+                final NodeResponse clusterResponse;
+                try {
+                    logger.debug("Replicating PUT request to {} for user {}", updateUri, user);
+
+                    if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                        clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+                    } else {
+                        clusterResponse = getRequestReplicator().forwardToCoordinator(
+                            getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+                    }
+                } catch (final InterruptedException ie) {
+                    logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user);
+                    Thread.currentThread().interrupt();
+                    throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
                 }
-            } catch (final InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
-            }
 
-            final int disableServicesStatus = clusterResponse.getStatus();
-            if (disableServicesStatus != Status.OK.getStatusCode()) {
-                final String explanation = getResponseEntity(clusterResponse, String.class);
-                throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+                final int updateFlowStatus = clusterResponse.getStatus();
+                if (updateFlowStatus != Status.OK.getStatusCode()) {
+                    final String explanation = getResponseEntity(clusterResponse, String.class);
+                    logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
+                        updateUri, user, updateFlowStatus, explanation);
+                    throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+                }
+
+            } else {
+                // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
+                // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
+                // Ensure that no Output Port was removed, unless it currently has no outgoing connections.
+                serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
+
+                // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
+                final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
+                final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
+                final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
+
+                final Bucket bucket = flowSnapshot.getBucket();
+                final VersionedFlow flow = flowSnapshot.getFlow();
+
+                final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+                final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+                vci.setBucketId(metadata.getBucketIdentifier());
+                vci.setBucketName(bucket.getName());
+                vci.setCurrent(flowSnapshot.isLatest());
+                vci.setFlowDescription(flow.getDescription());
+                vci.setFlowId(flow.getIdentifier());
+                vci.setFlowName(flow.getName());
+                vci.setGroupId(groupId);
+                vci.setModified(false);
+                vci.setRegistryId(requestVci.getRegistryId());
+                vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
+                vci.setVersion(metadata.getVersion());
+
+                serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows);
             }
+        } finally {
+            if (!asyncRequest.isCancelled()) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
+                }
 
-        } else {
-            // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
-            // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
-            // Ensure that no Output Port was removed, unless it currently has no outgoing connections.
-            serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
-
-            // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
-            final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
-            final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
-            final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
-
-            final Bucket bucket = flowSnapshot.getBucket();
-            final VersionedFlow flow = flowSnapshot.getFlow();
-
-            final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
-            final VersionControlInformationDTO vci = new VersionControlInformationDTO();
-            vci.setBucketId(metadata.getBucketIdentifier());
-            vci.setBucketName(bucket.getName());
-            vci.setCurrent(flowSnapshot.isLatest());
-            vci.setFlowDescription(flow.getDescription());
-            vci.setFlowId(flow.getIdentifier());
-            vci.setFlowName(flow.getName());
-            vci.setGroupId(groupId);
-            vci.setModified(false);
-            vci.setRegistryId(requestVci.getRegistryId());
-            vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
-            vci.setVersion(metadata.getVersion());
-
-            serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
-        }
+                asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
 
-        if (asyncRequest.isCancelled()) {
-            return null;
-        }
-        asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
+                // Step 13. Re-enable all disabled controller services
+                final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+                asyncRequest.setCancelCallback(enableServicesPause::cancel);
+                final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
+                logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
+                componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+            }
 
-        // Step 13. Re-enable all disabled controller services
-        final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-        final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
-        logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
-        componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+            if (!asyncRequest.isCancelled()) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents);
+                }
 
-        if (asyncRequest.isCancelled()) {
-            return null;
-        }
-        asyncRequest.update(new Date(), "Restarting Processors", 80);
+                asyncRequest.update(new Date(), "Restarting Processors", 80);
 
-        // Step 14. Restart all components
-        final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
-        final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-        logger.info("Restarting {} Processors", componentsToStart.size());
-        componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+                // Step 14. Restart all components
+                final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
+                final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+                asyncRequest.setCancelCallback(startComponentsPause::cancel);
+                logger.info("Restarting {} Processors", componentsToStart.size());
+                componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+            }
+        }
 
+        asyncRequest.setCancelCallback(null);
         if (asyncRequest.isCancelled()) {
             return null;
         }
@@ -1426,6 +1408,7 @@ public class VersionsResource extends ApplicationResource {
         return serviceFacade.getVersionControlInformation(groupId);
     }
 
+
     /**
      * Extracts the response entity from the specified node response.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
index 1309eee..3cecdeb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -99,4 +99,12 @@ public interface AsynchronousWebRequest<T> {
      * @return <code>true</code> if the request has been canceled, <code>false</code> otherwise
      */
     boolean isCancelled();
+
+    /**
+     * Sets the cancel callback to the given runnable, so that if {@link #cancel()} is called, the given {@link Runnable} will be triggered.
+     * If <code>null</code> is passed, no operation will be triggered when the task is cancelled.
+     *
+     * @param runnable the callback
+     */
+    void setCancelCallback(Runnable runnable);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index 4810a32..8e2e221 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -34,6 +34,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
     private volatile String failureReason;
     private volatile boolean cancelled;
     private volatile T results;
+    private volatile Runnable cancelCallback;
 
     public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) {
         this.id = requestId;
@@ -57,6 +58,11 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
     }
 
     @Override
+    public void setCancelCallback(final Runnable runnable) {
+        this.cancelCallback = runnable;
+    }
+
+    @Override
     public void markComplete(final T results) {
         this.complete = true;
         this.results = results;
@@ -130,6 +136,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
         percentComplete = 100;
         state = "Canceled by user";
         setFailureReason("Request cancelled by user");
+        cancelCallback.run();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6077268..7d40473 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -118,6 +118,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel;
@@ -2202,15 +2203,23 @@ public final class DtoFactory {
 
     private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
         VersionedComponent component = difference.getComponentA();
-        if (component == null) {
+        if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) {
             component = difference.getComponentB();
         }
 
         final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
-        dto.setComponentId(component.getIdentifier());
         dto.setComponentName(component.getName());
         dto.setComponentType(component.getComponentType().name());
-        dto.setProcessGroupId(dto.getProcessGroupId());
+
+        if (component instanceof InstantiatedVersionedComponent) {
+            final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component;
+            dto.setComponentId(instantiatedComponent.getInstanceId());
+            dto.setProcessGroupId(instantiatedComponent.getInstanceGroupId());
+        } else {
+            dto.setComponentId(component.getIdentifier());
+            dto.setProcessGroupId(dto.getProcessGroupId());
+        }
+
         return dto;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 7cf61ea..9259bf4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -114,10 +114,12 @@ public interface ProcessGroupDAO {
      * @param versionControlInformation the new Version Control Information
      * @param componentIdSeed the seed value to use for generating ID's for new components
      * @param updateSettings whether or not to update the process group's name and position
+     * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
+     *            update the contents of that Process Group
      * @return the process group
      */
     ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
-        boolean verifyNotModified, boolean updateSettings);
+        boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
 
     /**
      * Applies the given Version Control Information to the Process Group

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index e3c4725..bb7edb1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -29,6 +29,8 @@ import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -244,8 +246,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
         final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
         final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
 
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false);
+
         final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
             .registryName(registryName)
+            .flowSnapshot(flowSnapshot)
             .modified(false)
             .current(true)
             .build();
@@ -264,9 +270,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
 
     @Override
     public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
-        final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
+        final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
-        group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
+        group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
 
         final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
             .flowSnapshot(proposedSnapshot.getFlowContents())

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
index 1f83a6f..dea43f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -43,7 +43,7 @@ public class CancellableTimedPause implements Pause {
 
         long sysTime = System.nanoTime();
         final long maxWaitTime = System.nanoTime() + pauseNanos;
-        while (sysTime < maxWaitTime) {
+        while (sysTime < maxWaitTime && !cancelled) {
             try {
                 TimeUnit.NANOSECONDS.sleep(pauseNanos);
             } catch (final InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index 4469ea1..11bd1b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -421,18 +421,24 @@ public final class SnippetUtils {
         }
 
         // get a list of all names of process groups so that we can rename as needed.
-        final List<String> groupNames = new ArrayList<>();
+        final Set<String> groupNames = new HashSet<>();
         for (final ProcessGroup childGroup : group.getProcessGroups()) {
             groupNames.add(childGroup.getName());
         }
 
         if (snippetContents.getProcessGroups() != null) {
             for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
-                String groupName = groupDTO.getName();
-                while (groupNames.contains(groupName)) {
-                    groupName = "Copy of " + groupName;
+                // If Version Control Information is present, then we don't want to rename the
+                // Process Group - we want it to remain the same as the one in Version Control.
+                // However, in order to disambiguate things, we generally do want to rename to
+                // 'Copy of...' so we do this only if there is no Version Control Information present.
+                if (groupDTO.getVersionControlInformation() == null) {
+                    String groupName = groupDTO.getName();
+                    while (groupNames.contains(groupName)) {
+                        groupName = "Copy of " + groupName;
+                    }
+                    groupDTO.setName(groupName);
                 }
-                groupDTO.setName(groupName);
                 groupNames.add(groupDTO.getName());
             }
         }