You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:08 UTC

[20/50] nifi git commit: NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated autho

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index f61b399..3684f04 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,12 +17,37 @@
 
 package org.apache.nifi.web.api;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.ProcessGroupAuthorizable;
@@ -34,7 +59,9 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.registry.bucket.Bucket;
 import org.apache.nifi.registry.flow.FlowRegistryUtils;
+import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
@@ -69,35 +96,12 @@ import org.apache.nifi.web.util.Pause;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
 
 @Path("/versions")
 @Api(value = "/versions", description = "Endpoint for managing version control for a flow")
@@ -125,9 +129,12 @@ public class VersionsResource extends ApplicationResource {
     @Consumes(MediaType.WILDCARD)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
-    @ApiOperation(value = "Gets the Version Control information for a process group", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = {
-        @Authorization(value = "Read - /process-groups/{uuid}")
-    })
+    @ApiOperation(value = "Gets the Version Control information for a process group",
+        response = VersionControlInformationEntity.class,
+        notes = NON_GUARANTEED_ENDPOINT,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid}")
+        })
     @ApiResponses(value = {
         @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
         @ApiResponse(code = 401, message = "Client could not be authenticated."),
@@ -164,7 +171,9 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("start-requests")
     @ApiOperation(
-            value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed",
+        value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will "
+            + "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A "
+            + "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.",
             response = VersionControlInformationEntity.class,
             notes = NON_GUARANTEED_ENDPOINT)
     @ApiResponses(value = {
@@ -305,7 +314,8 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("start-requests/{id}")
     @ApiOperation(
-            value = "Deletes the request with the given ID",
+        value = "Deletes the Version Control Request with the given ID. This will allow other threads to save flows to the Flow Registry. See also the documentation "
+            + "for POSTing to /versions/start-requests for information regarding why this is done.",
             notes = NON_GUARANTEED_ENDPOINT)
     @ApiResponses(value = {
         @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -349,7 +359,8 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
     @ApiOperation(
-            value = "Begins version controlling the Process Group with the given ID",
+            value = "Begins version controlling the Process Group with the given ID or commits changes to the Versioned Flow, "
+                + "depending on if the provided VersionControlInformation includes a flowId",
             response = VersionControlInformationEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -365,7 +376,7 @@ public class VersionsResource extends ApplicationResource {
         @ApiResponse(code = 404, message = "The specified resource could not be found."),
         @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
     })
-    public Response startVersionControl(
+    public Response saveToFlowRegistry(
         @ApiParam("The process group id.") @PathParam("id") final String groupId,
         @ApiParam(value = "The versioned flow details.", required = true) final StartVersionControlRequestEntity requestEntity) throws IOException {
 
@@ -402,29 +413,7 @@ public class VersionsResource extends ApplicationResource {
             final URI requestUri;
             try {
                 final URI originalUri = getAbsolutePath();
-                final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
-                    originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment());
-
-                final NodeResponse clusterResponse;
-                try {
-                    if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-                        clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
-                    } else {
-                        clusterResponse = getRequestReplicator().forwardToCoordinator(
-                            getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
-                    }
-                } catch (final InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
-                }
-
-                if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
-                    final String errorResponse = getResponseEntity(clusterResponse, String.class);
-                    throw new IllegalStateException(
-                        "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse);
-                }
-
-                final String requestId = getResponseEntity(clusterResponse, String.class);
+                final String requestId = lockVersionControl(originalUri, groupId);
 
                 requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
                     originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment());
@@ -439,54 +428,12 @@ public class VersionsResource extends ApplicationResource {
             // Finally, we can delete the Request.
             try {
                 final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
-
-                final Map<String, String> headers = new HashMap<>();
-                headers.put("content-type", MediaType.APPLICATION_JSON);
-
-                final NodeResponse clusterResponse;
-                try {
-                    if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-                        clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
-                    } else {
-                        clusterResponse = getRequestReplicator().forwardToCoordinator(
-                            getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
-                    }
-                } catch (final InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
-                }
-
-                if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
-                    final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + ".";
-                    final Throwable cause = clusterResponse.getThrowable();
-                    if (cause == null) {
-                        throw new IllegalStateException(message);
-                    } else {
-                        throw new IllegalStateException(message, cause);
-                    }
-                }
+                replicateVersionControlMapping(mappingEntity, requestEntity, requestUri, groupId);
 
                 final VersionControlInformationEntity responseEntity = serviceFacade.getVersionControlInformation(groupId);
                 return generateOkResponse(responseEntity).build();
             } finally {
-                final NodeResponse clusterResponse;
-                try {
-                    if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-                        clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
-                    } else {
-                        clusterResponse = getRequestReplicator().forwardToCoordinator(
-                            getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
-                    }
-                } catch (final InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. "
-                        + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie);
-                }
-
-                if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
-                    logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. "
-                        + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus());
-                }
+                unlockVersionControl(requestUri, groupId);
             }
         }
 
@@ -560,13 +507,115 @@ public class VersionsResource extends ApplicationResource {
             });
     }
 
+    private void unlockVersionControl(final URI requestUri, final String groupId) {
+        final NodeResponse clusterResponse;
+        try {
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
+            }
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. "
+                + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie);
+        }
+
+        if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+            logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. "
+                + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus());
+        }
+    }
+
+    private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException {
+        final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+            originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment());
+
+        final NodeResponse clusterResponse;
+        try {
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
+            }
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
+        }
+
+        if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+            final String errorResponse = getResponseEntity(clusterResponse, String.class);
+            throw new IllegalStateException(
+                "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse);
+        }
+
+        final String requestId = getResponseEntity(clusterResponse, String.class);
+        return requestId;
+    }
+
+    private void replicateVersionControlMapping(final VersionControlComponentMappingEntity mappingEntity, final StartVersionControlRequestEntity requestEntity, final URI requestUri,
+        final String groupId) {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("content-type", MediaType.APPLICATION_JSON);
+
+        final NodeResponse clusterResponse;
+        try {
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse();
+            }
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            if (requestEntity.getVersionedFlow().getFlowId() == null) {
+                // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the
+                // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on).
+                final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation();
+                try {
+                    serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId());
+                } catch (final Exception e) {
+                    logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. "
+                        + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e);
+                }
+            }
+
+            throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie);
+        }
+
+        if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+            if (requestEntity.getVersionedFlow().getFlowId() == null) {
+                // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the
+                // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on).
+                final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation();
+                try {
+                    serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId());
+                } catch (final Exception e) {
+                    logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. "
+                        + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e);
+                }
+            }
+
+            final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + ".";
+            final Throwable cause = clusterResponse.getThrowable();
+            if (cause == null) {
+                throw new IllegalStateException(message);
+            } else {
+                throw new IllegalStateException(message, cause);
+            }
+        }
+    }
+
 
     @DELETE
     @Consumes(MediaType.WILDCARD)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
     @ApiOperation(
-            value = "Stops version controlling the Process Group with the given ID",
+            value = "Stops version controlling the Process Group with the given ID. The Process Group will no longer track to any Versioned Flow.",
             response = VersionControlInformationEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -626,7 +675,8 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
     @ApiOperation(
-            value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version",
+            value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version. This endpoint expects "
+                + "that the given snapshot will not modify any Processor that is currently running or any Controller Service that is enabled.",
             response = VersionControlInformationEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -689,14 +739,17 @@ public class VersionsResource extends ApplicationResource {
                 serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false);
             },
             (rev, entity) -> {
+                final Bucket bucket = flowSnapshot.getBucket();
+                final VersionedFlow flow = flowSnapshot.getFlow();
+
                 // Update the Process Group to match the proposed flow snapshot
                 final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
                 versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
-                versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName());
+                versionControlInfoDto.setBucketName(bucket.getName());
                 versionControlInfoDto.setCurrent(true);
                 versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
-                versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName());
-                versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription());
+                versionControlInfoDto.setFlowName(flow.getName());
+                versionControlInfoDto.setFlowDescription(flow.getDescription());
                 versionControlInfoDto.setGroupId(groupId);
                 versionControlInfoDto.setModified(false);
                 versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
@@ -720,7 +773,9 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("update-requests/{id}")
     @ApiOperation(
-            value = "Returns the Update Request with the given ID",
+            value = "Returns the Update Request with the given ID. Once an Update Request has been created by performing a POST to /versions/update-requests/process-groups/{id}, "
+                + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
+                + "current state of the request, and any failures.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -741,7 +796,9 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("revert-requests/{id}")
     @ApiOperation(
-            value = "Returns the Revert Request with the given ID",
+            value = "Returns the Revert Request with the given ID. Once a Revert Request has been created by performing a POST to /versions/revert-requests/process-groups/{id}, "
+                + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
+                + "current state of the request, and any failures.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -795,7 +852,9 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("update-requests/{id}")
     @ApiOperation(
-            value = "Deletes the Update Request with the given ID",
+        value = "Deletes the Update Request with the given ID. After a request is created via a POST to /versions/update-requests/process-groups/{id}, it is expected "
+            + "that the client will properly clean up the request by DELETE'ing it, once the Update process has completed. If the request is deleted before the request "
+            + "completes, then the Update request will finish the step that it is currently performing and then will cancel any subsequent steps.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -816,7 +875,9 @@ public class VersionsResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("revert-requests/{id}")
     @ApiOperation(
-            value = "Deletes the Revert Request with the given ID",
+            value = "Deletes the Revert Request with the given ID. After a request is created via a POST to /versions/revert-requests/process-groups/{id}, it is expected "
+                + "that the client will properly clean up the request by DELETE'ing it, once the Revert process has completed. If the request is deleted before the request "
+            + "completes, then the Revert request will finish the step that it is currently performing and then will cancel any subsequent steps.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -881,7 +942,12 @@ public class VersionsResource extends ApplicationResource {
     @Path("update-requests/process-groups/{id}")
     @ApiOperation(
             value = "For a Process Group that is already under Version Control, this will initiate the action of changing "
-                    + "from a specific version of the flow in the Flow Registry to a different version of the flow.",
+                + "from a specific version of the flow in the Flow Registry to a different version of the flow. This can be a lengthy "
+                + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
+                + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur "
+                + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
+                + "/versions/update-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
+                + "/versions/update-requests/{requestId}.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -1058,8 +1124,13 @@ public class VersionsResource extends ApplicationResource {
     @Path("revert-requests/process-groups/{id}")
     @ApiOperation(
             value = "For a Process Group that is already under Version Control, this will initiate the action of reverting "
-                + "any changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the "
-                + "flow matching the Versioned Flow that exists in the Flow Registry.",
+                + "any local changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the "
+                + "flow matching the Versioned Flow that exists in the Flow Registry. This can be a lengthy "
+                + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
+                + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur "
+                + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
+                + "/versions/revert-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
+                + "/versions/revert-requests/{requestId}.",
             response = VersionedFlowUpdateRequestEntity.class,
             notes = NON_GUARANTEED_ENDPOINT,
             authorizations = {
@@ -1174,34 +1245,6 @@ public class VersionsResource extends ApplicationResource {
                     throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
                 }
 
-                // If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return.
-                if (currentVersion.getModified() == Boolean.FALSE) {
-                    final String requestId = UUID.randomUUID().toString();
-                    final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete");
-                    requestManager.submitRequest("revert-requests", requestId, request, task -> {
-                    });
-
-                    // There is nothing to do. Generate the response and send it back to the user.
-                    final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
-                    updateRequestDto.setComplete(true);
-                    updateRequestDto.setFailureReason(null);
-                    updateRequestDto.setLastUpdated(new Date());
-                    updateRequestDto.setProcessGroupId(groupId);
-                    updateRequestDto.setRequestId(requestId);
-                    updateRequestDto.setVersionControlInformation(currentVersion);
-                    updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
-                    updateRequestDto.setPercentCompleted(100);
-                    updateRequestDto.setState(request.getState());
-
-                    final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
-                    updateRequestEntity.setProcessGroupRevision(revisionDto);
-                    updateRequestEntity.setRequest(updateRequestDto);
-
-                    request.markComplete(currentVersionEntity);
-                    return generateOkResponse(updateRequestEntity).build();
-                }
-
-
                 // Create an asynchronous request that will occur in the background, because this request may
                 // result in stopping components, which can take an indeterminate amount of time.
                 final String requestId = UUID.randomUUID().toString();
@@ -1331,7 +1374,25 @@ public class VersionsResource extends ApplicationResource {
             // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
             final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
             final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
-            final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation();
+            final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
+
+            final Bucket bucket = flowSnapshot.getBucket();
+            final VersionedFlow flow = flowSnapshot.getFlow();
+
+            final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+            final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+            vci.setBucketId(metadata.getBucketIdentifier());
+            vci.setBucketName(bucket.getName());
+            vci.setCurrent(flowSnapshot.isLatest());
+            vci.setFlowDescription(flow.getDescription());
+            vci.setFlowId(flow.getIdentifier());
+            vci.setFlowName(flow.getName());
+            vci.setGroupId(groupId);
+            vci.setModified(false);
+            vci.setRegistryId(requestVci.getRegistryId());
+            vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
+            vci.setVersion(metadata.getVersion());
+
             serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index bd603be..1a12dcf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,33 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -188,32 +215,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -2230,8 +2231,8 @@ public final class DtoFactory {
         dto.setFlowName(versionControlInfo.getFlowName());
         dto.setFlowDescription(versionControlInfo.getFlowDescription());
         dto.setVersion(versionControlInfo.getVersion());
-        dto.setCurrent(versionControlInfo.getCurrent().orElse(true));
-        dto.setModified(versionControlInfo.getModified().orElse(false));
+        dto.setCurrent(versionControlInfo.isCurrent());
+        dto.setModified(versionControlInfo.isModified());
         return dto;
     }