You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/03/02 21:04:39 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #4846: NIFI-8260 Implement an Upload File capability in the NiFi UI for flow definitions.

exceptionfactory commented on a change in pull request #4846:
URL: https://github.com/apache/nifi/pull/4846#discussion_r585908114



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
##########
@@ -4173,6 +4158,264 @@ private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGr
         }
     }
 
+    /**
+     * Uploads the specified versioned flow definition and adds it to a new process group.
+     *
+     * @param httpServletRequest request
+     * @param in The flow definition stream
+     * @return A processGroupEntity
+     * @throws IOException if there is an error during deserialization of the InputStream
+     */
+    @POST
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups/upload")
+    @ApiOperation(
+            value = "Uploads a versioned flow definition and creates a process group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response uploadProcessGroup(
+            @Context final HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") final String groupId,
+            @ApiParam(
+                    value = "The process group name.",
+                    required = true
+            )
+            @FormDataParam("groupName") final String groupName,
+            @ApiParam(
+                    value = "The process group X position.",
+                    required = true
+            )
+            @FormDataParam("positionX") final Double positionX,
+            @ApiParam(
+                    value = "The process group Y position.",
+                    required = true
+            )
+            @FormDataParam("positionY") final Double positionY,
+            @ApiParam(
+                    value = "The client id.",
+                    required = true
+            )
+            @FormDataParam("clientId") final String clientId,
+            @ApiParam(
+                    value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
+                    required = false
+            )
+            @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @FormDataParam("file") final InputStream in) throws InterruptedException {
+
+        // ensure the group name is specified
+        if (StringUtils.isBlank(groupName)) {
+            throw new IllegalArgumentException("The process group name is required.");
+        }
+
+        if (StringUtils.isBlank(groupId)) {
+            throw new IllegalArgumentException("The parent process group id is required");
+        }
+
+        if (positionX == null) {
+            throw new IllegalArgumentException("The x coordinate of the proposed position must be specified.");
+        }
+
+        if (positionY == null) {
+            throw new IllegalArgumentException("The y coordinate of the proposed position must be specified.");
+        }
+
+        if (StringUtils.isBlank(clientId)) {
+            throw new IllegalArgumentException("The client id must be specified");
+        }
+
+        // deserialize InputStream to a VersionedFlowSnapshot
+        VersionedFlowSnapshot deserializedSnapshot;
+
+        try {
+            deserializedSnapshot = MAPPER.readValue(in, VersionedFlowSnapshot.class);
+        } catch (IOException e) {
+            logger.warn("Deserialization of uploaded JSON failed", e);
+            throw new IllegalArgumentException("Deserialization of uploaded JSON failed", e);
+        }
+
+        // clear Registry info
+        sanitizeRegistryInfo(deserializedSnapshot.getFlowContents());
+
+        // resolve Bundle info
+        serviceFacade.discoverCompatibleBundles(deserializedSnapshot.getFlowContents());
+
+        // if there are any Controller Services referenced that are inherited from the parent group,
+        // resolve those to point to the appropriate Controller Service, if we are able to.
+        serviceFacade.resolveInheritedControllerServices(deserializedSnapshot, groupId, NiFiUserUtils.getNiFiUser());
+
+        if (isDisconnectedFromCluster()) {
+            verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
+        }
+
+        // build the response entity for a replicate request
+        ProcessGroupUploadEntity pgUploadEntity = new ProcessGroupUploadEntity();
+        pgUploadEntity.setId(groupId);
+        pgUploadEntity.setGroupName(groupName);
+        pgUploadEntity.setPositionX(positionX);
+        pgUploadEntity.setPositionY(positionY);
+        pgUploadEntity.setClientId(clientId);
+        pgUploadEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged);
+        pgUploadEntity.setFlowSnapshot(deserializedSnapshot);
+
+        // replicate the request
+        if (isReplicateRequest()) {
+            // convert request accordingly
+            final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
+            uriBuilder.segment("process-groups", groupId, "process-groups", "import");
+            final URI importUri = uriBuilder.build();
+
+            final Map<String, String> headersToOverride = new HashMap<>();
+            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
+
+            // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+            // to the cluster nodes themselves.
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                return getRequestReplicator().replicate(HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+            } else {
+                return getRequestReplicator().forwardToCoordinator(
+                        getClusterCoordinatorNode(), HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+            }
+        }
+
+        // otherwise import the process group locally
+        return importProcessGroup(httpServletRequest, groupId, pgUploadEntity);
+    }
+
+    /**
+     * Imports the specified process group.
+     *
+     * @param httpServletRequest request
+     * @param processGroupUploadEntity     A ProcessGroupUploadEntity.
+     * @return A processGroupEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups/import")
+    @ApiOperation(
+            value = "Imports a specified process group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response importProcessGroup(
+            @Context final HttpServletRequest httpServletRequest,

Review comment:
       Is this parameter necessary? It does not appear to be used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org