You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/21 23:30:33 UTC
[07/10] nifi git commit: NIFI-1554: - Introducing new REST endpoints
to align with the authorizable resources. - Additionally changes to support
the new endpoints. - Addressing comments in PR. - This closes #374.
http://git-wip-us.apache.org/repos/asf/nifi/blob/add29816/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
new file mode 100644
index 0000000..6874ad5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.DownloadableContent;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * RESTful endpoint for managing a flowfile queue.
+ */
+@Path("/flowfile-queues")
+@Api(
+ value = "/flowfile-queues",
+ description = "Endpoint for managing a FlowFile Queue."
+)
+public class FlowFileQueueResource extends ApplicationResource {
+
+ private NiFiServiceFacade serviceFacade;
+ private WebClusterManager clusterManager;
+ private NiFiProperties properties;
+
+ /**
+ * Populate the URIs for the specified flowfile listing.
+ *
+ * @param connectionId connection
+ * @param flowFileListing flowfile listing
+ * @return dto
+ */
+ public ListingRequestDTO populateRemainingFlowFileListingContent(final String connectionId, final ListingRequestDTO flowFileListing) {
+ // uri of the listing
+ flowFileListing.setUri(generateResourceUri("flowfile-queues", connectionId, "listing-requests", flowFileListing.getId()));
+
+ // uri of each flowfile
+ if (flowFileListing.getFlowFileSummaries() != null) {
+ for (FlowFileSummaryDTO flowFile : flowFileListing.getFlowFileSummaries()) {
+ populateRemainingFlowFileContent(connectionId, flowFile);
+ }
+ }
+ return flowFileListing;
+ }
+
+ /**
+ * Populate the URIs for the specified flowfile.
+ *
+ * @param connectionId the connection id
+ * @param flowFile the flowfile
+ * @return the dto
+ */
+ public FlowFileSummaryDTO populateRemainingFlowFileContent(final String connectionId, final FlowFileSummaryDTO flowFile) {
+ flowFile.setUri(generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFile.getUuid()));
+ return flowFile;
+ }
+
+ /**
+ * Gets the specified flowfile from the specified connection.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param flowFileUuid The flowfile uuid
+ * @param clusterNodeId The cluster node id where the flowfile resides
+ * @return a flowFileDTO
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/flowfiles/{flowfile-uuid}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets a FlowFile from a Connection.",
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getFlowFile(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The flowfile uuid.",
+ required = true
+ )
+ @PathParam("flowfile-uuid") String flowFileUuid,
+ @ApiParam(
+ value = "The id of the node where the content exists if clustered.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ throw new IllegalArgumentException("The id of the node in the cluster is required.");
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
+
+ // get the flowfile
+ final FlowFileDTO flowfileDto = serviceFacade.getFlowFile(connectionId, flowFileUuid);
+ populateRemainingFlowFileContent(connectionId, flowfileDto);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final FlowFileEntity entity = new FlowFileEntity();
+ entity.setRevision(revision);
+ entity.setFlowFile(flowfileDto);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Gets the content for the specified flowfile in the specified connection.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param flowFileUuid The flowfile uuid
+ * @param clusterNodeId The cluster node id
+ * @return The content stream
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.WILDCARD)
+ @Path("{connection-id}/flowfiles/{flowfile-uuid}/content")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets the content for a FlowFile in a Connection.",
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response downloadFlowFileContent(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The flowfile uuid.",
+ required = true
+ )
+ @PathParam("flowfile-uuid") String flowFileUuid,
+ @ApiParam(
+ value = "The id of the node where the content exists if clustered.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ throw new IllegalArgumentException("The id of the node in the cluster is required.");
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
+
+ // get the uri of the request
+ final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content");
+
+ // get an input stream to the content
+ final DownloadableContent content = serviceFacade.getContent(connectionId, flowFileUuid, uri);
+
+ // generate a streaming response
+ final StreamingOutput response = new StreamingOutput() {
+ @Override
+ public void write(OutputStream output) throws IOException, WebApplicationException {
+ try (InputStream is = content.getContent()) {
+ // stream the content to the response
+ StreamUtils.copy(is, output);
+
+ // flush the response
+ output.flush();
+ }
+ }
+ };
+
+ // use the appropriate content type
+ String contentType = content.getType();
+ if (contentType == null) {
+ contentType = MediaType.APPLICATION_OCTET_STREAM;
+ }
+
+ return generateOkResponse(response).type(contentType).header("Content-Disposition", String.format("attachment; filename=\"%s\"", content.getFilename())).build();
+ }
+
+ /**
+ * Creates a request to list the flowfiles in the queue of the specified connection.
+ *
+ * @param httpServletRequest request
+ * @param id The id of the connection
+ * @return A listRequestEntity
+ */
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/listing-requests")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Lists the contents of the queue in this connection.",
+ response = ListingRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response createFlowFileListing(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String id) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ serviceFacade.verifyListQueue(id);
+ return generateContinueResponse().build();
+ }
+
+ // ensure the id is the same across the cluster
+ final String listingRequestId;
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ listingRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
+ } else {
+ listingRequestId = UUID.randomUUID().toString();
+ }
+
+ // submit the listing request
+ final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(id, listingRequestId);
+ populateRemainingFlowFileListingContent(id, listingRequest);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(listingRequest);
+
+ // generate the URI where the response will be
+ final URI location = URI.create(listingRequest.getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+ }
+
+ /**
+ * Checks the status of an outstanding listing request.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The id of the connection
+ * @param listingRequestId The id of the drop request
+ * @return A dropRequestEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/listing-requests/{listing-request-id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets the current status of a listing request for the specified connection.",
+ response = ListingRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getListingRequest(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The listing request id.",
+ required = true
+ )
+ @PathParam("listing-request-id") String listingRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // get the listing request
+ final ListingRequestDTO listingRequest = serviceFacade.getFlowFileListingRequest(connectionId, listingRequestId);
+ populateRemainingFlowFileListingContent(connectionId, listingRequest);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(listingRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Deletes the specified listing request.
+ *
+ * @param httpServletRequest request
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param listingRequestId The drop request id
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/listing-requests/{listing-request-id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Cancels and/or removes a request to list the contents of this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response deleteListingRequest(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The listing request id.",
+ required = true
+ )
+ @PathParam("listing-request-id") String listingRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
+ // delete the listing request
+ final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(connectionId, listingRequestId);
+
+ // prune the results as they were already received when the listing completed
+ listingRequest.setFlowFileSummaries(null);
+
+ // populate remaining content
+ populateRemainingFlowFileListingContent(connectionId, listingRequest);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(listingRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Creates a request to delete the flowfiles in the queue of the specified connection.
+ *
+ * @param httpServletRequest request
+ * @param id The id of the connection
+ * @return A dropRequestEntity
+ */
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/drop-requests")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Creates a request to drop the contents of the queue in this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response createDropRequest(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String id) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
+ // ensure the id is the same across the cluster
+ final String dropRequestId;
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
+ } else {
+ dropRequestId = UUID.randomUUID().toString();
+ }
+
+ // submit the drop request
+ final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(id, dropRequestId);
+ dropRequest.setUri(generateResourceUri("flowfile-queues", id, "drop-requests", dropRequest.getId()));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+ entity.setDropRequest(dropRequest);
+
+ // generate the URI where the response will be
+ final URI location = URI.create(dropRequest.getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+ }
+
+ /**
+ * Checks the status of an outstanding drop request.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The id of the connection
+ * @param dropRequestId The id of the drop request
+ * @return A dropRequestEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/drop-requests/{drop-request-id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets the current status of a drop request for the specified connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getDropRequest(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") String dropRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // get the drop request
+ final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(connectionId, dropRequestId);
+ dropRequest.setUri(generateResourceUri("flowfile-queues", connectionId, "drop-requests", dropRequestId));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+ entity.setDropRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Deletes the specified drop request.
+ *
+ * @param httpServletRequest request
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param dropRequestId The drop request id
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{connection-id}/drop-requests/{drop-request-id}")
+ // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Cancels and/or removes a request to drop the contents of this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response removeDropRequest(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") String dropRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
+ // delete the drop request
+ final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(connectionId, dropRequestId);
+ dropRequest.setUri(generateResourceUri("flowfile-queues", connectionId, "drop-requests", dropRequestId));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+ entity.setDropRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ // setters
+ public void setServiceFacade(NiFiServiceFacade serviceFacade) {
+ this.serviceFacade = serviceFacade;
+ }
+
+ public void setClusterManager(WebClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+}