You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/12 16:02:00 UTC
nifi git commit: NIFI-730: - Starting to add support for deleting
flow files from a queue by creating endpoints and starting to wire everything
together. - Adding context menu item for initiating the request to drop flow
files.
Repository: nifi
Updated Branches:
refs/heads/NIFI-730 b4bfcc1f2 -> e0ac7cde3
NIFI-730:
- Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together.
- Adding context menu item for initiating the request to drop flow files.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e0ac7cde
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e0ac7cde
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e0ac7cde
Branch: refs/heads/NIFI-730
Commit: e0ac7cde372f428b0655465b7adc59ad41f8f270
Parents: b4bfcc1
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon Oct 12 10:00:54 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Oct 12 10:00:54 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/web/api/dto/DropRequestDTO.java | 129 +++++++++++
.../nifi/web/api/entity/DropRequestEntity.java | 44 ++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 25 +++
.../nifi/web/StandardNiFiServiceFacade.java | 24 ++
.../apache/nifi/web/api/ConnectionResource.java | 217 ++++++++++++++++++-
.../org/apache/nifi/web/dao/ConnectionDAO.java | 22 ++
.../web/dao/impl/StandardConnectionDAO.java | 15 ++
.../src/main/webapp/js/nf/canvas/nf-actions.js | 58 +++++
.../main/webapp/js/nf/canvas/nf-context-menu.js | 10 +
9 files changed, 542 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
new file mode 100644
index 0000000..dd4289f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+import java.util.Date;
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+/**
+ * A request to drop the contents of a connection.
+ */
+@XmlType(name = "dropRequest")
+public class DropRequestDTO {
+
+ private String id;
+ private String uri;
+
+ private Date submissionTime;
+ private Date expiration;
+
+ private Integer percentCompleted;
+ private Boolean finished;
+
+ /**
+ * The id for this component.
+ *
+ * @return The id
+ */
+ @ApiModelProperty(
+ value = "The id of the component."
+ )
+ public String getId() {
+ return this.id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ /**
+ * The uri for linking to this component in this NiFi.
+ *
+ * @return The uri
+ */
+ @ApiModelProperty(
+ value = "The URI for futures requests to the component."
+ )
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ /**
+ * @return time the query was submitted
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "The timestamp when the query was submitted."
+ )
+ public Date getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public void setSubmissionTime(Date submissionTime) {
+ this.submissionTime = submissionTime;
+ }
+
+ /**
+ * @return expiration time of the query results
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "The timestamp when the query will expire."
+ )
+ public Date getExpiration() {
+ return expiration;
+ }
+
+ public void setExpiration(Date expiration) {
+ this.expiration = expiration;
+ }
+
+ /**
+ * @return percent completed
+ */
+ @ApiModelProperty(
+ value = "The current percent complete."
+ )
+ public Integer getPercentCompleted() {
+ return percentCompleted;
+ }
+
+ public void setPercentCompleted(Integer percentCompleted) {
+ this.percentCompleted = percentCompleted;
+ }
+
+ /**
+ * @return whether the query has finished
+ */
+ @ApiModelProperty(
+ value = "Whether the query has finished."
+ )
+ public Boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(Boolean finished) {
+ this.finished = finished;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
new file mode 100644
index 0000000..078c019
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO.
+ */
+@XmlRootElement(name = "dropRequestEntity")
+public class DropRequestEntity extends Entity {
+
+ private DropRequestDTO dropRequest;
+
+ /**
+ * The DropRequestDTO that is being serialized.
+ *
+ * @return The DropRequestDTO object
+ */
+ public DropRequestDTO getDropRequest() {
+ return dropRequest;
+ }
+
+ public void setDropRequest(DropRequestDTO dropRequest) {
+ this.dropRequest = dropRequest;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index c98b1e4..28f6b61 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@@ -525,6 +526,30 @@ public interface NiFiServiceFacade {
*/
ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId);
+ /**
+ * Creates a new flow file drop request.
+ *
+ * @param groupId group
+ * @param connectionId The ID of the connection
+ * @return
+ */
+ DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId);
+
+ /**
+ * Gets the specified flow file drop request.
+ *
+ * @param dropRequestId The flow file drop request
+ * @return The DropRequest
+ */
+ DropRequestDTO getFlowFileDropRequest(String dropRequestId);
+
+ /**
+ * Cancels/removes the specified flow file drop request.
+ *
+ * @param dropRequestId The flow file drop request
+ */
+ void deleteFlowFileDropRequest(String dropRequestId);
+
// ----------------------------------------
// InputPort methods
// ----------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 2286213..7f0a296 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
@@ -809,6 +810,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void deleteFlowFileDropRequest(String dropRequestId) {
+ // TODO
+ }
+
+ @Override
public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
@@ -1060,6 +1066,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) {
+ // TODO
+ final DropRequestDTO dto = new DropRequestDTO();
+ dto.setFinished(false);
+ dto.setSubmissionTime(new Date());
+ dto.setExpiration(new Date(System.currentTimeMillis() + 10000));
+ dto.setId(UUID.randomUUID().toString());
+ dto.setPercentCompleted(100);
+ return dto;
+ }
+
+ @Override
public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
@Override
@@ -2092,6 +2110,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public DropRequestDTO getFlowFileDropRequest(String dropRequestId) {
+ // TODO
+ return null;
+ }
+
+ @Override
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 64c14fa..dfc20fb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,6 +50,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.util.NiFiProperties;
@@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
@@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource {
@ApiParam(
value = "The connection configuration details.",
required = true
- )
- ConnectionEntity connectionEntity) {
+ ) ConnectionEntity connectionEntity) {
if (connectionEntity == null || connectionEntity.getConnection() == null) {
throw new IllegalArgumentException("Connection details must be specified.");
@@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource {
return clusterContext(generateOkResponse(entity)).build();
}
+ /**
+ * Drops the flowfiles in the queue of the specified connection.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the connection
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/contents")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Drops the contents of the queue in this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response dropQueueContents(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String id) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // ensure the id is the same across the cluster
+ final String dropRequestId;
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
+ } else {
+ dropRequestId = UUID.randomUUID().toString();
+ }
+
+ // submit the drop request
+ final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id);
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+ entity.setDropRequest(dropRequest);
+
+ // generate the URI where the response will be
+ final URI location = URI.create(dropRequest.getUri());
+ if (dropRequest.isFinished()) {
+ return generateCreatedResponse(location, entity).build();
+ } else {
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+ }
+ }
+
+ /**
+ * Checks the status of an outstanding drop request.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The id of the connection
+ * @param dropRequestId The id of the drop request
+ * @return A dropRequestEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets the current status of a drop request for the specified connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getDropRequest(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") String dropRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // get the drop request
+ final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId);
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+ entity.setDropRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Deletes the specified drop request.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param dropRequestId The drop request id
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Cancels and/or removes a request drop of the contents in this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response removeDropRequest(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") String dropRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // delete the drop request
+ serviceFacade.deleteFlowFileDropRequest(dropRequestId);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setRevision(revision);
+
+ return generateOkResponse(entity).build();
+ }
+
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index e0fb89e..ce1d1fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -32,6 +32,13 @@ public interface ConnectionDAO {
Connection getConnection(String groupId, String id);
/**
+ * Gets the specified flow file drop request.
+ *
+ * @param dropRequestId The drop request id
+ */
+ void getFlowFileDropRequest(String dropRequestId);
+
+ /**
* Gets the connections for the specified source processor.
*
* @param groupId group id
@@ -67,6 +74,14 @@ public interface ConnectionDAO {
Connection createConnection(String groupId, ConnectionDTO connectionDTO);
/**
+ * Creates a new flow file drop request.
+ *
+ * @param groupId group id
+ * @param id connection id
+ */
+ void createFileFlowDropRequest(String groupId, String id);
+
+ /**
* Verifies the create request can be processed.
*
* @param groupId group id
@@ -106,4 +121,11 @@ public interface ConnectionDAO {
* @param id The id of the connection
*/
void deleteConnection(String groupId, String id);
+
+ /**
+ * Deletes the specified flow file drop request.
+ *
+ * @param dropRequestId The drop request id
+ */
+ void deleteFlowFileDropRequest(String dropRequestId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 5fbc393..8fa9d3b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -69,6 +69,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
+ public void getFlowFileDropRequest(String dropRequestId) {
+ // TODO
+ }
+
+ @Override
public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) {
final Set<Connection> connections = new HashSet<>(getConnections(groupId));
for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) {
@@ -294,6 +299,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
+ public void createFileFlowDropRequest(String groupId, String id) {
+ // TODO
+ }
+
+ @Override
public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
// validate the incoming request
final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
@@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
group.removeConnection(connection);
}
+ @Override
+ public void deleteFlowFileDropRequest(String dropRequestId) {
+ // TODO
+ }
+
/* setters */
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index 3b47a8d..bab2236 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -846,6 +846,64 @@ nf.Actions = (function () {
},
/**
+ * Deletes the flow files in the specified connection.
+ *
+ * @param {type} selection
+ */
+ deleteQueueContents: function (selection) {
+ if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) {
+ return;
+ }
+
+ // process the drop request
+ var processDropRequest = function (dropRequest, nextDelay) {
+ // see if the drop request has completed
+ if (dropRequest.finished === true) {
+ deleteDropRequest(dropRequest);
+ } else {
+ schedule(dropRequest, nextDelay);
+ }
+ };
+
+ // schedule for the next poll iteration
+ var schedule = function (dropRequest, delay) {
+ setTimeout(function () {
+ $.ajax({
+ type: 'GET',
+ url: dropRequest.uri,
+ dataType: 'json'
+ }).done(function(response) {
+ var dropRequest = response.dropRequest;
+ processDropRequest(dropRequest, Math.min(8, delay * 2));
+ }).fail(nf.Common.handleAjaxError);
+ }, delay * 1000);
+ };
+
+ // delete the drop request
+ var deleteDropRequest = function (dropRequest) {
+ $.ajax({
+ type: 'DELETE',
+ url: dropRequest.uri,
+ dataType: 'json'
+ }).done(function() {
+ // drop request has been deleted
+ }).fail(nf.Common.handleAjaxError);
+ };
+
+ // get the connection data
+ var connection = selection.datum();
+
+ // issue the request to delete the flow files
+ $.ajax({
+ type: 'DELETE',
+ url: connection.component.uri + '/contents',
+ dataType: 'json'
+ }).done(function(response) {
+ processDropRequest(response.dropRequest, 1);
+ }).fail(nf.Common.handleAjaxError);
+ },
+
+ /**
* Opens the fill color dialog for the component in the specified selection.
*
* @param {type} selection The selection
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index e652dd4..58397d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -278,6 +278,15 @@ nf.ContextMenu = (function () {
};
/**
+ * Only DFMs can delete flow files from a connection.
+ *
+ * @param {selection} selection
+ */
+ var canDeleteFlowFiles = function (selection) {
+ return nf.Common.isDFM() && isConnection(selection);
+ };
+
+ /**
* Determines if the components in the specified selection can be moved into a parent group.
*
* @param {type} selection
@@ -373,6 +382,7 @@ nf.ContextMenu = (function () {
{condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}},
{condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}},
{condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}},
+ {condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}},
{condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}}
];