You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/05 21:28:42 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #4023: NIFI-6873: Added support for replacing a process group via import

markap14 commented on a change in pull request #4023: NIFI-6873: Added support for replacing a process group via import
URL: https://github.com/apache/nifi/pull/4023#discussion_r374333221
 
 

 ##########
 File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 ##########
 @@ -3894,6 +3889,252 @@ public Response createControllerService(
         );
     }
 
+    /**
+     * Initiates the request to replace the Process Group with the given ID with the Process Group in the given import entity
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and the process group to replace with
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace-requests")
+    @ApiOperation(
+            value = "Initiate the Replace Request of a Process Group with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "This will initiate the action of replacing a process group with the given process group. This can be a lengthy "
+                    + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
+                    + "the endpoint will immediately return a ProcessGroupReplaceRequestEntity, and the process of replacing the flow will occur "
+                    + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
+                    + "/process-groups/replace-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
+                    + "/process-groups/replace-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}"),
+                    @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+                    @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
+                    @Authorization(value = "Write - if the template contains any restricted components - /restricted-components"),
+                    @Authorization(value = "Read - /parameter-contexts/{uuid} - For any Parameter Context that is referenced by a Property that is changed, added, or removed")
+            }
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response initiateReplaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+                                                @ApiParam(value = "The process group replace request entity", required = true) final ProcessGroupImportEntity importEntity) {
+        // replacing a flow under version control is not permitted via import. Versioned flows have additional requirements to allow
+        // them only to be replaced by a different version of the same flow.
+        if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
+            throw new IllegalStateException("Cannot replace a Process Group via import while it or its descendants are under Version Control.");
+        }
+
+        final VersionedFlowSnapshot versionedFlowSnapshot = importEntity.getVersionedFlowSnapshot();
+        if (versionedFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied");
+        }
+
+        return initiateFlowUpdate(groupId, importEntity, true, "replace-requests",
+                "/nifi-api/process-groups/" + groupId + "/replace", importEntity::getVersionedFlowSnapshot);
+    }
+
+    /**
+     * Replace the Process Group with the given ID with the specified Process Group.
+     *
+     * This is the endpoint used in a cluster update replication scenario.
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and the process group to replace with
+     * @return A ProcessGroupImportEntity.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace")
+    @ApiOperation(
+            value = "Replace Process Group with the given ID with the specified Process Group",
+            response = ProcessGroupImportEntity.class,
+            notes = "This endpoint is used for replication within a cluster, when replacing a flow with a new flow. It expects that the flow being"
+                    + "replaced is not under version control and that the given snapshot will not modify any Processor that is currently running "
+                    + "or any Controller Service that is enabled. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response replaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+                                        @ApiParam(value = "The process group replace request entity.", required = true) final ProcessGroupImportEntity importEntity) {
+
+        // Verify the request
+        final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
+        if (revisionDto == null) {
+            throw new IllegalArgumentException("Process Group Revision must be specified.");
+        }
+
+        final VersionedFlowSnapshot requestFlowSnapshot = importEntity.getVersionedFlowSnapshot();
+        if (requestFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied.");
+        }
+
+        // Perform the request
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, importEntity);
+        } else if (isDisconnectedFromCluster()) {
+            verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        final Revision requestRevision = getRevision(importEntity.getProcessGroupRevision(), groupId);
+        return withWriteLock(
+                serviceFacade,
+                importEntity,
+                requestRevision,
+                lookup -> {
+                    final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                    final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+                    processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                    processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                },
+                () -> {
+                    // We do not enforce that the Process Group is 'not dirty' because at this point,
+                    // the client has explicitly indicated the dataflow that the Process Group should
+                    // provide and provided the Revision to ensure that they have the most up-to-date
+                    // view of the Process Group.
+                    serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
+                },
+                (revision, entity) -> {
+                    final ProcessGroupEntity updatedGroup =
+                            performUpdateFlow(groupId, revision, importEntity, entity.getVersionedFlowSnapshot(),
+                                    getIdGenerationSeed().orElse(null), false, true);
+
+                    // response to replication request is an entity with revision info but no versioned flow snapshot
+                    final ProcessGroupImportEntity responseEntity = new ProcessGroupImportEntity();
+                    responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
+
+                    return generateOkResponse(responseEntity).build();
+                });
+    }
+
+    /**
+     * Retrieve a request to replace a Process Group by request ID.
+     *
+     * @param replaceRequestId  The ID of the replace request
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Returns the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Returns the Replace Request with the given ID. Once a Replace Request has been created by performing a POST to /process-groups/{id}/replace-requests, "
+                    + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
+                    + "current state of the request, and any failures. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the request can get it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response getReplaceProcessGroupRequest(
+            @ApiParam("The ID of the Replace Request") @PathParam("id") final String replaceRequestId) {
+        return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
+    }
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Deletes the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Deletes the Replace Request with the given ID. After a request is created via a POST to /process-groups/{id}/replace-requests, it is expected "
+                    + "that the client will properly clean up the request by DELETE'ing it, once the Replace process has completed. If the request is deleted before the request "
+                    + "completes, then the Replace request will finish the step that it is currently performing and then will cancel any subsequent steps. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the request can remove it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response deleteReplaceProcessGroupRequest(
+            @ApiParam(value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", required = false)
+                @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @ApiParam("The ID of the Update Request") @PathParam("id") final String replaceRequestId) {
+        return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue());
+    }
+
+    /**
+     * Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates.
+     */
+    @Override
+    protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision, final ProcessGroupImportEntity requestEntity,
+                                                   final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed,
+                                                   final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
+        logger.info("Replacing Process Group with ID {} with imported Process Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
+
+        // Step 10-11. Update Process Group to the new flow (including name) and update variable registry with any Variables that were added or removed
 
 Review comment:
   Probably can remove the "Step 10-11" part of this comment... not sure that it makes sense anymore in this context :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services