You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:11 UTC

[23/50] nifi git commit: NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated autho

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index b010bf3..b808ae6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,15 +17,40 @@
 
 package org.apache.nifi.web.api;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.authorization.AuthorizableLookup;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.ProcessGroupAuthorizable;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
@@ -34,7 +59,6 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.registry.flow.ComponentType;
 import org.apache.nifi.registry.flow.FlowRegistryUtils;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
@@ -55,9 +79,9 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO;
 import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
 import org.apache.nifi.web.api.entity.AffectedComponentEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
-import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
@@ -70,37 +94,12 @@ import org.apache.nifi.web.util.Pause;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
 
 @Path("/versions")
 @Api(value = "/versions", description = "Endpoint for managing version control for a flow")
@@ -356,7 +355,10 @@ public class VersionsResource extends ApplicationResource {
             response = VersionControlInformationEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
-                @Authorization(value = "Read - /process-groups/{uuid}")
+                @Authorization(value = "Read - /process-groups/{uuid}"),
+                @Authorization(value = "Write - /process-groups/{uuid}"),
+                @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+                @Authorization(value = "Read - any referenced Controller Services by any encapsulated components - /controller-services/{uuid}")
             })
     @ApiResponses(value = {
         @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -500,9 +502,11 @@ public class VersionsResource extends ApplicationResource {
             requestEntity,
             groupRevision,
             lookup -> {
-                final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+                final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                final Authorizable processGroup = groupAuthorizable.getAuthorizable();
                 processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
                 processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
             },
             () -> {
                 final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId);
@@ -663,22 +667,21 @@ public class VersionsResource extends ApplicationResource {
             throw new IllegalArgumentException("The Flow ID must be supplied.");
         }
 
-
         // Perform the request
         if (isReplicateRequest()) {
             return replicate(HttpMethod.PUT, requestEntity);
         }
 
-        // Determine which components will be affected by updating the version
-        final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, NiFiUserUtils.getNiFiUser());
-
         final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
         return withWriteLock(
             serviceFacade,
             requestEntity,
             requestRevision,
             lookup -> {
-                authorizeAffectedComponents(lookup, affectedComponents);
+                final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+                processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
             },
             () -> {
                 // We do not enforce that the Process Group is 'not dirty' because at this point,
@@ -691,13 +694,16 @@ public class VersionsResource extends ApplicationResource {
                 // Update the Process Group to match the proposed flow snapshot
                 final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
                 versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
+                versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName());
                 versionControlInfoDto.setCurrent(true);
                 versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
                 versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName());
+                versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription());
                 versionControlInfoDto.setGroupId(groupId);
                 versionControlInfoDto.setModified(false);
                 versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
                 versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
+                versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
 
                 final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false);
                 final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
@@ -769,6 +775,13 @@ public class VersionsResource extends ApplicationResource {
         updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId());
         updateRequestDto.setRequestId(requestId);
         updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
+        updateRequestDto.setState(asyncRequest.getState());
+        updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete());
+
+        if (updateRequestDto.isComplete()) {
+            final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId());
+            updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
+        }
 
         final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision();
 
@@ -830,6 +843,13 @@ public class VersionsResource extends ApplicationResource {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
         final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest = requestManager.removeRequest(requestType, requestId, user);
+        if (asyncRequest == null) {
+            throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId);
+        }
+
+        if (!asyncRequest.isComplete()) {
+            asyncRequest.cancel();
+        }
 
         final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
         updateRequestDto.setComplete(asyncRequest.isComplete());
@@ -838,6 +858,13 @@ public class VersionsResource extends ApplicationResource {
         updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId());
         updateRequestDto.setRequestId(requestId);
         updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
+        updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete());
+        updateRequestDto.setState(asyncRequest.getState());
+
+        if (updateRequestDto.isComplete()) {
+            final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId());
+            updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
+        }
 
         final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision();
 
@@ -861,7 +888,10 @@ public class VersionsResource extends ApplicationResource {
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
                 @Authorization(value = "Read - /process-groups/{uuid}"),
-                @Authorization(value = "Write - /process-groups/{uuid}")
+                @Authorization(value = "Write - /process-groups/{uuid}"),
+                @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+                @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
+                @Authorization(value = "Write - if the template contains any restricted components - /restricted-components")
             })
     @ApiResponses(value = {
         @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -924,7 +954,7 @@ public class VersionsResource extends ApplicationResource {
         //    a. Component itself is modified in some way, other than position changing.
         //    b. Source and Destination of any Connection that is modified.
         //    c. Any Processor or Controller Service that references a Controller Service that is modified.
-        // 2. Verify READ and WRITE permissions for user, for every component affected.
+        // 2. Verify READ and WRITE permissions for user, for every component.
         // 3. Verify that all components in the snapshot exist on all nodes (i.e., the NAR exists)?
         // 4. Verify that Process Group is already under version control. If not, must start Version Control instead of updateFlow
         // 5. Verify that Process Group is not 'dirty'.
@@ -961,8 +991,13 @@ public class VersionsResource extends ApplicationResource {
             requestEntity,
             requestRevision,
             lookup -> {
-                // Step 2: Verify READ and WRITE permissions for user, for every component affected.
-                authorizeAffectedComponents(lookup, affectedComponents);
+                // Step 2: Verify READ and WRITE permissions for user, for every component.
+                final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+                processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
+                super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
 
                 final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
                 final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
@@ -980,7 +1015,7 @@ public class VersionsResource extends ApplicationResource {
                 // Create an asynchronous request that will occur in the background, because this request may
                 // result in stopping components, which can take an indeterminate amount of time.
                 final String requestId = UUID.randomUUID().toString();
-                final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+                final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
 
                 // Submit the request to be performed in the background
                 final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
@@ -1005,6 +1040,8 @@ public class VersionsResource extends ApplicationResource {
                 updateRequestDto.setProcessGroupId(groupId);
                 updateRequestDto.setRequestId(requestId);
                 updateRequestDto.setUri(generateResourceUri("versions", "update-requests", requestId));
+                updateRequestDto.setPercentComplete(request.getPercentComplete());
+                updateRequestDto.setState(request.getState());
 
                 final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
                 final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision);
@@ -1029,7 +1066,10 @@ public class VersionsResource extends ApplicationResource {
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
                 @Authorization(value = "Read - /process-groups/{uuid}"),
-                @Authorization(value = "Write - /process-groups/{uuid}")
+                @Authorization(value = "Write - /process-groups/{uuid}"),
+                @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+                @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
+                @Authorization(value = "Write - if the template contains any restricted components - /restricted-components")
             })
     @ApiResponses(value = {
         @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -1095,8 +1135,13 @@ public class VersionsResource extends ApplicationResource {
             requestEntity,
             requestRevision,
             lookup -> {
-                // Step 2: Verify READ and WRITE permissions for user, for every component affected.
-                authorizeAffectedComponents(lookup, affectedComponents);
+                // Step 2: Verify READ and WRITE permissions for user, for every component.
+                final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+                processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
+                super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
 
                 final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
                 final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
@@ -1134,7 +1179,7 @@ public class VersionsResource extends ApplicationResource {
                 // If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return.
                 if (currentVersion.getModified() == Boolean.FALSE) {
                     final String requestId = UUID.randomUUID().toString();
-                    final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+                    final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete");
                     requestManager.submitRequest("revert-requests", requestId, request, task -> {
                     });
 
@@ -1145,7 +1190,10 @@ public class VersionsResource extends ApplicationResource {
                     updateRequestDto.setLastUpdated(new Date());
                     updateRequestDto.setProcessGroupId(groupId);
                     updateRequestDto.setRequestId(requestId);
+                    updateRequestDto.setVersionControlInformation(currentVersion);
                     updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
+                    updateRequestDto.setPercentComplete(100);
+                    updateRequestDto.setState(request.getState());
 
                     final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
                     updateRequestEntity.setProcessGroupRevision(revisionDto);
@@ -1159,19 +1207,18 @@ public class VersionsResource extends ApplicationResource {
                 // Create an asynchronous request that will occur in the background, because this request may
                 // result in stopping components, which can take an indeterminate amount of time.
                 final String requestId = UUID.randomUUID().toString();
-                final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+                final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
 
                 // Submit the request to be performed in the background
                 final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
                     try {
-                        // TODO: change the URI to the new endpoint for 'revert' instead of 'change version'
                         final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
                             affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false);
 
                         vcur.markComplete(updatedVersionControlEntity);
                     } catch (final LifecycleManagementException e) {
                         logger.error("Failed to update flow to new version", e);
-                        vcur.setFailureReason("Failed to update flow to new version due to " + e);
+                        vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
                     }
                 };
 
@@ -1201,7 +1248,6 @@ public class VersionsResource extends ApplicationResource {
         final boolean verifyNotModified) throws LifecycleManagementException {
 
         // Steps 6-7: Determine which components must be stopped and stop them.
-        // Do we need to stop other types? Input Ports, Output Ports, Funnels, RPGs, etc.
         final Set<String> stoppableReferenceTypes = new HashSet<>();
         stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
         stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
@@ -1215,7 +1261,11 @@ public class VersionsResource extends ApplicationResource {
         logger.info("Stopping {} Processors", runningComponents.size());
         final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause);
-        asyncRequest.setLastUpdated(new Date());
+
+        if (asyncRequest.isCancelled()) {
+            return null;
+        }
+        asyncRequest.update(new Date(), "Disabling Affected Controller Services", 20);
 
         // Steps 8-9. Disable enabled controller services that are affected
         final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
@@ -1226,7 +1276,11 @@ public class VersionsResource extends ApplicationResource {
         logger.info("Disabling {} Controller Services", enabledServices.size());
         final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause);
-        asyncRequest.setLastUpdated(new Date());
+
+        if (asyncRequest.isCancelled()) {
+            return null;
+        }
+        asyncRequest.update(new Date(), "Updating Flow", 40);
 
         logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
         // If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
@@ -1281,21 +1335,32 @@ public class VersionsResource extends ApplicationResource {
             serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
         }
 
-        asyncRequest.setLastUpdated(new Date());
+        if (asyncRequest.isCancelled()) {
+            return null;
+        }
+        asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
 
         // Step 13. Re-enable all disabled controller services
         final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
         logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
         componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
-        asyncRequest.setLastUpdated(new Date());
+
+        if (asyncRequest.isCancelled()) {
+            return null;
+        }
+        asyncRequest.update(new Date(), "Restarting Processors", 80);
 
         // Step 14. Restart all components
         final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
         final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         logger.info("Restarting {} Processors", componentsToStart.size());
         componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
-        asyncRequest.setLastUpdated(new Date());
+
+        if (asyncRequest.isCancelled()) {
+            return null;
+        }
+        asyncRequest.update(new Date(), "Complete", 100);
 
         return serviceFacade.getVersionControlInformation(groupId);
     }
@@ -1318,42 +1383,6 @@ public class VersionsResource extends ApplicationResource {
     }
 
 
-    private void authorizeAffectedComponents(final AuthorizableLookup lookup, final Set<AffectedComponentEntity> affectedComponents) {
-        final Map<String, List<AffectedComponentEntity>> componentsByType = affectedComponents.stream()
-            .collect(Collectors.groupingBy(entity -> entity.getComponent().getReferenceType()));
-
-        authorize(componentsByType.get(ComponentType.PROCESSOR.name()), id -> lookup.getProcessor(id).getAuthorizable());
-        authorize(componentsByType.get(ComponentType.CONTROLLER_SERVICE.name()), id -> lookup.getControllerService(id).getAuthorizable());
-
-        authorize(componentsByType.get(ComponentType.CONNECTION.name()), id -> lookup.getConnection(id).getAuthorizable());
-        authorize(componentsByType.get(ComponentType.FUNNEL.name()), id -> lookup.getFunnel(id));
-        authorize(componentsByType.get(ComponentType.INPUT_PORT.name()), id -> lookup.getInputPort(id));
-        authorize(componentsByType.get(ComponentType.OUTPUT_PORT.name()), id -> lookup.getOutputPort(id));
-        authorize(componentsByType.get(ComponentType.LABEL.name()), id -> lookup.getLabel(id));
-
-        authorize(componentsByType.get(ComponentType.PROCESS_GROUP.name()), id -> lookup.getProcessGroup(id).getAuthorizable());
-        authorize(componentsByType.get(ComponentType.REMOTE_PROCESS_GROUP.name()), id -> lookup.getRemoteProcessGroup(id));
-
-
-        // Remote Input Ports and Remote Output Ports are not authorized independently but rather at the Remote Process Group level,
-        // so we have to treat these a little differently.
-        componentsByType.getOrDefault(ComponentType.REMOTE_INPUT_PORT.name(), Collections.emptyList()).stream()
-            .forEach(affectedPort -> {
-                final String rpgId = affectedPort.getComponent().getProcessGroupId();
-                final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId);
-                rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
-                rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
-            });
-
-        componentsByType.getOrDefault(ComponentType.REMOTE_OUTPUT_PORT.name(), Collections.emptyList()).stream()
-            .forEach(affectedPort -> {
-                final String rpgId = affectedPort.getComponent().getProcessGroupId();
-                final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId);
-                rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
-                rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
-            });
-    }
-
     private Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities, final NiFiUser user) {
         final Set<AffectedComponentEntity> entities = new LinkedHashSet<>();
 
@@ -1373,17 +1402,6 @@ public class VersionsResource extends ApplicationResource {
     }
 
 
-    private void authorize(final List<AffectedComponentEntity> componentDtos, final Function<String, Authorizable> authFunction) {
-        if (componentDtos != null) {
-            for (final AffectedComponentEntity entity : componentDtos) {
-                final Authorizable authorizable = authFunction.apply(entity.getComponent().getId());
-                authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
-                authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
-            }
-        }
-    }
-
-
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
index 4b87b50..5dcb125 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -110,7 +110,6 @@ public class AsyncRequestManager<T> implements RequestManager<T> {
                 } catch (final Exception e) {
                     logger.error("Failed to perform asynchronous task", e);
                     request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e);
-                    request.setLastUpdated(new Date());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
index 2c14008..1309eee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -39,11 +39,23 @@ public interface AsynchronousWebRequest<T> {
     Date getLastUpdated();
 
     /**
-     * Updates the Date at which the status of this request was last updated
+     * @return the current state of the request
+     */
+    public String getState();
+
+    /**
+     * @return the current percent complete, between 0 and 100 (inclusive)
+     */
+    public int getPercentComplete();
+
+    /**
+     * Updates the request to indicate the new state and percent complete
      *
-     * @param date the date at which the status of this request was last updated
+     * @param date the last updated time
+     * @param state the new state
+     * @param percentComplete The percentage complete, between 0 and 100 (inclusive)
      */
-    void setLastUpdated(Date date);
+    void update(Date date, String state, int percentComplete);
 
     /**
      * @return the user who submitted the request
@@ -77,4 +89,14 @@ public interface AsynchronousWebRequest<T> {
      * @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
      */
     T getResults();
+
+    /**
+     * Cancels the request so that no more steps can be completed
+     */
+    void cancel();
+
+    /**
+     * @return <code>true</code> if the request has been canceled, <code>false</code> otherwise
+     */
+    boolean isCancelled();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index 8ba9a58..4810a32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -29,13 +29,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
 
     private volatile boolean complete = false;
     private volatile Date lastUpdated = new Date();
+    private volatile String state;
+    private volatile int percentComplete;
     private volatile String failureReason;
+    private volatile boolean cancelled;
     private volatile T results;
 
-    public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) {
+    public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) {
         this.id = requestId;
         this.processGroupId = processGroupId;
         this.user = user;
+        this.state = state;
     }
 
     public String getRequestId() {
@@ -57,6 +61,8 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
         this.complete = true;
         this.results = results;
         this.lastUpdated = new Date();
+        this.percentComplete = 100;
+        this.state = "Complete";
     }
 
     @Override
@@ -65,8 +71,34 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
     }
 
     @Override
-    public void setLastUpdated(final Date date) {
-        this.lastUpdated = lastUpdated;
+    public String getState() {
+        return state;
+    }
+
+    @Override
+    public int getPercentComplete() {
+        return percentComplete;
+    }
+
+    @Override
+    public void update(Date date, String state, int percentComplete) {
+        if (percentComplete < 0 || percentComplete > 100) {
+            throw new IllegalArgumentException("Cannot set percent complete to a value of " + percentComplete + "; it must be between 0 and 100.");
+        }
+
+        if (isCancelled()) {
+            throw new IllegalStateException("Cannot update state because request has already been cancelled by user");
+        }
+
+        if (isComplete()) {
+            final String failure = getFailureReason();
+            final String explanation = failure == null ? "successfully" : "with failure reason: " + failure;
+            throw new IllegalStateException("Cannot update state to '" + state + "' because request is already completed " + explanation);
+        }
+
+        this.lastUpdated = date;
+        this.state = state;
+        this.percentComplete = percentComplete;
     }
 
     @Override
@@ -79,6 +111,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
         this.failureReason = Objects.requireNonNull(explanation);
         this.complete = true;
         this.results = null;
+        this.lastUpdated = new Date();
     }
 
     @Override
@@ -90,4 +123,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
     public T getResults() {
         return results;
     }
+
+    @Override
+    public void cancel() {
+        this.cancelled = true;
+        percentComplete = 100;
+        state = "Canceled by user";
+        setFailureReason("Request cancelled by user");
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return cancelled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 3639b18..1c1e729 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -115,8 +115,12 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
@@ -2174,6 +2178,38 @@ public final class DtoFactory {
         return dto;
     }
 
+
+    public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
+        final Map<ComponentDifferenceDTO, List<String>> differencesByComponent = new HashMap<>();
+
+        for (final FlowDifference difference : comparison.getDifferences()) {
+            final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
+            final List<String> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
+            differences.add(difference.getDescription());
+        }
+
+        for (final Map.Entry<ComponentDifferenceDTO, List<String>> entry : differencesByComponent.entrySet()) {
+            entry.getKey().setDifferences(entry.getValue());
+        }
+
+        return differencesByComponent.keySet();
+    }
+
+    private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
+        VersionedComponent component = difference.getComponentA();
+        if (component == null) {
+            component = difference.getComponentB();
+        }
+
+        final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
+        dto.setComponentId(component.getIdentifier());
+        dto.setComponentName(component.getName());
+        dto.setComponentType(component.getComponentType().name());
+        dto.setProcessGroupId(dto.getProcessGroupId());
+        return dto;
+    }
+
+
     public VersionControlInformationDTO createVersionControlInformationDto(final ProcessGroup group) {
         if (group == null) {
             return null;
@@ -2187,10 +2223,12 @@ public final class DtoFactory {
         final VersionControlInformationDTO dto = new VersionControlInformationDTO();
         dto.setGroupId(group.getIdentifier());
         dto.setRegistryId(versionControlInfo.getRegistryIdentifier());
+        dto.setRegistryName(versionControlInfo.getRegistryName());
         dto.setBucketId(versionControlInfo.getBucketIdentifier());
+        dto.setBucketName(versionControlInfo.getBucketName());
         dto.setFlowId(versionControlInfo.getFlowIdentifier());
-        // TODO - need to get flow name here
-        dto.setFlowName(group.getName());
+        dto.setFlowName(versionControlInfo.getFlowName());
+        dto.setFlowDescription(versionControlInfo.getFlowDescription());
         dto.setVersion(versionControlInfo.getVersion());
         dto.setCurrent(versionControlInfo.getCurrent().orElse(null));
         dto.setModified(versionControlInfo.getModified().orElse(null));
@@ -2204,6 +2242,9 @@ public final class DtoFactory {
         group.getProcessors().stream()
             .map(proc -> (InstantiatedVersionedProcessor) proc)
             .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier()));
+        group.getFunnels().stream()
+            .map(funnel -> (InstantiatedVersionedFunnel) funnel)
+            .forEach(funnel -> mapping.put(funnel.getInstanceId(), funnel.getIdentifier()));
         group.getInputPorts().stream()
             .map(port -> (InstantiatedVersionedPort) port)
             .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
@@ -2224,13 +2265,17 @@ public final class DtoFactory {
             .forEach(rpg -> {
                 mapping.put(rpg.getInstanceId(), rpg.getIdentifier());
 
-                rpg.getInputPorts().stream()
-                    .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
-                    .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+                if (rpg.getInputPorts() != null) {
+                    rpg.getInputPorts().stream()
+                        .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+                        .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+                }
 
-                rpg.getOutputPorts().stream()
-                    .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
-                    .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+                if (rpg.getOutputPorts() != null) {
+                    rpg.getOutputPorts().stream()
+                        .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+                        .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+                }
             });
 
         group.getProcessGroups().stream()
@@ -3407,9 +3452,12 @@ public final class DtoFactory {
 
         final VersionControlInformationDTO copy = new VersionControlInformationDTO();
         copy.setRegistryId(original.getRegistryId());
+        copy.setRegistryName(original.getRegistryName());
         copy.setBucketId(original.getBucketId());
+        copy.setBucketName(original.getBucketName());
         copy.setFlowId(original.getFlowId());
         copy.setFlowName(original.getFlowName());
+        copy.setFlowDescription(original.getFlowDescription());
         copy.setVersion(original.getVersion());
         copy.setCurrent(original.getCurrent());
         copy.setModified(original.getModified());

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
index 19f2de4..4f5af74 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
@@ -17,8 +17,14 @@
 
 package org.apache.nifi.web.dao.impl;
 
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.VersionedFlow;
@@ -71,37 +77,48 @@ public class FlowRegistryDAO implements RegistryDAO {
                 throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
             }
 
-            return flowRegistry.getBuckets(user);
-        } catch (final IOException ioe) {
-            throw new NiFiCoreException("Unable to obtain bucket listing: " + ioe.getMessage(), ioe);
+            final Set<Bucket> buckets = flowRegistry.getBuckets(user);
+            final Set<Bucket> sortedBuckets = new TreeSet<>((b1, b2) -> b1.getName().compareTo(b2.getName()));
+            sortedBuckets.addAll(buckets);
+            return sortedBuckets;
+        } catch (final IOException | NiFiRegistryException ioe) {
+            throw new NiFiCoreException("Unable to obtain listing of buckets: " + ioe, ioe);
         }
     }
 
 
     @Override
     public Set<VersionedFlow> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
-        final Set<Bucket> bucketsForUser = getBucketsForUser(registryId, user);
+        try {
+            final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+            if (flowRegistry == null) {
+                throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
+            }
 
-        // TODO - implement getBucket(bucketId, user)
-        final Bucket bucket = bucketsForUser.stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
-        if (bucket == null) {
-            throw new IllegalArgumentException("The specified bucket is not available.");
+            final Set<VersionedFlow> flows = flowRegistry.getFlows(bucketId, user);
+            final Set<VersionedFlow> sortedFlows = new TreeSet<>((f1, f2) -> f1.getName().compareTo(f2.getName()));
+            sortedFlows.addAll(flows);
+            return sortedFlows;
+        } catch (final IOException | NiFiRegistryException ioe) {
+            throw new NiFiCoreException("Unable to obtain listing of flows for bucket with ID " + bucketId + ": " + ioe, ioe);
         }
-
-        return bucket.getVersionedFlows();
     }
 
     @Override
     public Set<VersionedFlowSnapshotMetadata> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
-        final Set<VersionedFlow> flowsForUser = getFlowsForUser(registryId, bucketId, user);
+        try {
+            final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+            if (flowRegistry == null) {
+                throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
+            }
 
-        // TODO - implement getFlow(bucketId, flowId, user)
-        final VersionedFlow versionedFlow = flowsForUser.stream().filter(vf -> vf.getIdentifier().equals(flowId)).findFirst().orElse(null);
-        if (versionedFlow == null) {
-            throw new IllegalArgumentException("The specified flow is not available.");
+            final Set<VersionedFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(bucketId, flowId, user);
+            final Set<VersionedFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>((f1, f2) -> Integer.compare(f1.getVersion(), f2.getVersion()));
+            sortedFlowVersions.addAll(flowVersions);
+            return sortedFlowVersions;
+        } catch (final IOException | NiFiRegistryException ioe) {
+            throw new NiFiCoreException("Unable to obtain listing of versions for bucket with ID " + bucketId + " and flow with ID " + flowId + ": " + ioe, ioe);
         }
-
-        return versionedFlow.getSnapshotMetadata();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
index 35c537d..f830e9b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
@@ -38,7 +38,11 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
 
     private Port locatePort(final String portId) {
         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
-        final Port port = rootGroup.findInputPort(portId);
+        Port port = rootGroup.findInputPort(portId);
+
+        if (port == null) {
+            port = rootGroup.findOutputPort(portId);
+        }
 
         if (port == null) {
             throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
@@ -50,7 +54,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
     @Override
     public boolean hasPort(String portId) {
         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
-        return rootGroup.findInputPort(portId) != null;
+        return rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 963220e..78f3e31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -238,11 +239,15 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
         final String registryId = versionControlInformation.getRegistryId();
-        final String bucketId = versionControlInformation.getBucketId();
-        final String flowId = versionControlInformation.getFlowId();
-        final int version = versionControlInformation.getVersion();
+        final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
+        final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+
+        final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
+            .registryName(registryName)
+            .modified(false)
+            .current(true)
+            .build();
 
-        final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true);
         group.setVersionControlInformation(vci, versionedComponentMapping);
 
         return group;
@@ -261,14 +266,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
         group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
 
-        final StandardVersionControlInformation svci = new StandardVersionControlInformation(
-            versionControlInformation.getRegistryId(),
-            versionControlInformation.getBucketId(),
-            versionControlInformation.getFlowId(),
-            versionControlInformation.getVersion(),
-            proposedSnapshot.getFlowContents(),
-            versionControlInformation.getModified(),
-            versionControlInformation.getCurrent());
+        final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
+            .flowSnapshot(proposedSnapshot.getFlowContents())
+            .build();
 
         group.setVersionControlInformation(svci, Collections.emptyMap());
         return group;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
index a6efb71..1f83a6f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -24,9 +24,10 @@ public class CancellableTimedPause implements Pause {
     private final long pauseNanos;
     private volatile boolean cancelled = false;
 
-    public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) {
-        final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit);
-        expirationNanoTime = System.nanoTime() + expirationNanos;
+    public CancellableTimedPause(final long pauseTime, final long expirationPeriod, final TimeUnit timeUnit) {
+        final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationPeriod, timeUnit);
+        final long expirationTime = System.nanoTime() + expirationNanos;
+        expirationNanoTime = expirationTime < 0 ? Long.MAX_VALUE : expirationTime;
         pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit));
     }
 
@@ -44,7 +45,7 @@ public class CancellableTimedPause implements Pause {
         final long maxWaitTime = System.nanoTime() + pauseNanos;
         while (sysTime < maxWaitTime) {
             try {
-                TimeUnit.NANOSECONDS.wait(pauseNanos);
+                TimeUnit.NANOSECONDS.sleep(pauseNanos);
             } catch (final InterruptedException ie) {
                 Thread.currentThread().interrupt();
                 return false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 490862c..584cac6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1641,6 +1641,12 @@
                 <version>${nifi.registry.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.nifi.registry</groupId>
+                <artifactId>nifi-registry-client</artifactId>
+                <version>${nifi.registry.version}</version>
+            </dependency>
+            
+            <dependency>
                 <groupId>com.jayway.jsonpath</groupId>
                 <artifactId>json-path</artifactId>
                 <version>2.0.0</version>