You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:31 UTC

[03/40] nifi git commit: NIFI-730: - Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together. - Adding context menu item for initiating the request to drop flow files.

NIFI-730:
- Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together.
- Adding context menu item for initiating the request to drop flow files.

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e0ac7cde
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e0ac7cde
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e0ac7cde

Branch: refs/heads/NIFI-274
Commit: e0ac7cde372f428b0655465b7adc59ad41f8f270
Parents: b4bfcc1
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon Oct 12 10:00:54 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Oct 12 10:00:54 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/web/api/dto/DropRequestDTO.java | 129 +++++++++++
 .../nifi/web/api/entity/DropRequestEntity.java  |  44 ++++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  25 +++
 .../nifi/web/StandardNiFiServiceFacade.java     |  24 ++
 .../apache/nifi/web/api/ConnectionResource.java | 217 ++++++++++++++++++-
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  22 ++
 .../web/dao/impl/StandardConnectionDAO.java     |  15 ++
 .../src/main/webapp/js/nf/canvas/nf-actions.js  |  58 +++++
 .../main/webapp/js/nf/canvas/nf-context-menu.js |  10 +
 9 files changed, 542 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
new file mode 100644
index 0000000..dd4289f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+import java.util.Date;
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+/**
+ * A request to drop the contents of a connection.
+ */
+@XmlType(name = "dropRequest")
+public class DropRequestDTO {
+
+    private String id;
+    private String uri;
+
+    private Date submissionTime;
+    private Date expiration;
+
+    private Integer percentCompleted;
+    private Boolean finished;
+
+    /**
+     * The id for this component.
+     *
+     * @return The id
+     */
+    @ApiModelProperty(
+            value = "The id of the component."
+    )
+    public String getId() {
+        return this.id;
+    }
+
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    /**
+     * The uri for linking to this component in this NiFi.
+     *
+     * @return The uri
+     */
+    @ApiModelProperty(
+            value = "The URI for futures requests to the component."
+    )
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    /**
+     * @return time the query was submitted
+     */
+    @XmlJavaTypeAdapter(TimestampAdapter.class)
+    @ApiModelProperty(
+            value = "The timestamp when the query was submitted."
+    )
+    public Date getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public void setSubmissionTime(Date submissionTime) {
+        this.submissionTime = submissionTime;
+    }
+
+    /**
+     * @return expiration time of the query results
+     */
+    @XmlJavaTypeAdapter(TimestampAdapter.class)
+    @ApiModelProperty(
+            value = "The timestamp when the query will expire."
+    )
+    public Date getExpiration() {
+        return expiration;
+    }
+
+    public void setExpiration(Date expiration) {
+        this.expiration = expiration;
+    }
+
+    /**
+     * @return percent completed
+     */
+    @ApiModelProperty(
+            value = "The current percent complete."
+    )
+    public Integer getPercentCompleted() {
+        return percentCompleted;
+    }
+
+    public void setPercentCompleted(Integer percentCompleted) {
+        this.percentCompleted = percentCompleted;
+    }
+
+    /**
+     * @return whether the query has finished
+     */
+    @ApiModelProperty(
+            value = "Whether the query has finished."
+    )
+    public Boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(Boolean finished) {
+        this.finished = finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
new file mode 100644
index 0000000..078c019
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO.
+ */
+@XmlRootElement(name = "dropRequestEntity")
+public class DropRequestEntity extends Entity {
+
+    private DropRequestDTO dropRequest;
+
+    /**
+     * The DropRequestDTO that is being serialized.
+     *
+     * @return The DropRequestDTO object
+     */
+    public DropRequestDTO getDropRequest() {
+        return dropRequest;
+    }
+
+    public void setDropRequest(DropRequestDTO dropRequest) {
+        this.dropRequest = dropRequest;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index c98b1e4..28f6b61 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@@ -525,6 +526,30 @@ public interface NiFiServiceFacade {
      */
     ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId);
 
+    /**
+     * Creates a new flow file drop request.
+     * 
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @return 
+     */
+    DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId);
+    
+    /**
+     * Gets the specified flow file drop request.
+     * 
+     * @param dropRequestId The flow file drop request
+     * @return The DropRequest
+     */
+    DropRequestDTO getFlowFileDropRequest(String dropRequestId);
+    
+    /**
+     * Cancels/removes the specified flow file drop request.
+     * 
+     * @param dropRequestId The flow file drop request
+     */
+    void deleteFlowFileDropRequest(String dropRequestId);
+    
     // ----------------------------------------
     // InputPort methods
     // ----------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 2286213..7f0a296 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
@@ -809,6 +810,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void deleteFlowFileDropRequest(String dropRequestId) {
+        // TODO
+    }
+
+    @Override
     public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
         return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
             @Override
@@ -1060,6 +1066,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) {
+        // TODO
+        final DropRequestDTO dto = new DropRequestDTO();
+        dto.setFinished(false);
+        dto.setSubmissionTime(new Date());
+        dto.setExpiration(new Date(System.currentTimeMillis() + 10000));
+        dto.setId(UUID.randomUUID().toString());
+        dto.setPercentCompleted(100);
+        return dto;
+    }
+
+    @Override
     public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
         return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
             @Override
@@ -2092,6 +2110,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public DropRequestDTO getFlowFileDropRequest(String dropRequestId) {
+        // TODO
+        return null;
+    }
+
+    @Override
     public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
         return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 64c14fa..dfc20fb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,6 +50,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.util.NiFiProperties;
@@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
@@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource {
             @ApiParam(
                     value = "The connection configuration details.",
                     required = true
-            )
-            ConnectionEntity connectionEntity) {
+            ) ConnectionEntity connectionEntity) {
 
         if (connectionEntity == null || connectionEntity.getConnection() == null) {
             throw new IllegalArgumentException("Connection details must be specified.");
@@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource {
         return clusterContext(generateOkResponse(entity)).build();
     }
 
+    /**
+     * Drops the flowfiles in the queue of the specified connection.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the connection
+     * @return A dropRequestEntity
+     */
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{connection-id}/contents")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+            value = "Drops the contents of the queue in this connection.",
+            response = DropRequestEntity.class,
+            authorizations = {
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+            }
+    )
+    @ApiResponses(
+            value = {
+                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response dropQueueContents(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The connection id.",
+                    required = true
+            )
+            @PathParam("connection-id") String id) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // ensure the id is the same across the cluster
+        final String dropRequestId;
+        final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+        if (clusterContext != null) {
+            dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
+        } else {
+            dropRequestId = UUID.randomUUID().toString();
+        }
+
+        // submit the drop request
+        final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id);
+        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId));
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final DropRequestEntity entity = new DropRequestEntity();
+        entity.setRevision(revision);
+        entity.setDropRequest(dropRequest);
+
+        // generate the URI where the response will be
+        final URI location = URI.create(dropRequest.getUri());
+        if (dropRequest.isFinished()) {
+            return generateCreatedResponse(location, entity).build();
+        } else {
+            return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+        }
+    }
+
+    /**
+     * Checks the status of an outstanding drop request.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param connectionId The id of the connection
+     * @param dropRequestId The id of the drop request
+     * @return A dropRequestEntity
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+            value = "Gets the current status of a drop request for the specified connection.",
+            response = DropRequestEntity.class,
+            authorizations = {
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+            }
+    )
+    @ApiResponses(
+            value = {
+                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getDropRequest(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The connection id.",
+                    required = true
+            )
+            @PathParam("connection-id") String connectionId,
+            @ApiParam(
+                    value = "The drop request id.",
+                    required = true
+            )
+            @PathParam("drop-request-id") String dropRequestId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // get the drop request
+        final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId);
+        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final DropRequestEntity entity = new DropRequestEntity();
+        entity.setRevision(revision);
+        entity.setDropRequest(dropRequest);
+
+        return generateOkResponse(entity).build();
+    }
+
+    /**
+     * Deletes the specified drop request.
+     * 
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param connectionId The connection id
+     * @param dropRequestId The drop request id
+     * @return A dropRequestEntity
+     */
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+    @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+            value = "Cancels and/or removes a request drop of the contents in this connection.",
+            response = DropRequestEntity.class,
+            authorizations = {
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+            }
+    )
+    @ApiResponses(
+            value = {
+                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response removeDropRequest(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The connection id.",
+                    required = true
+            )
+            @PathParam("connection-id") String connectionId,
+            @ApiParam(
+                    value = "The drop request id.",
+                    required = true
+            )
+            @PathParam("drop-request-id") String dropRequestId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // delete the drop request
+        serviceFacade.deleteFlowFileDropRequest(dropRequestId);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final DropRequestEntity entity = new DropRequestEntity();
+        entity.setRevision(revision);
+
+        return generateOkResponse(entity).build();
+    }
+
     // setters
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index e0fb89e..ce1d1fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -32,6 +32,13 @@ public interface ConnectionDAO {
     Connection getConnection(String groupId, String id);
 
     /**
+     * Gets the specified flow file drop request.
+     * 
+     * @param dropRequestId The drop request id
+     */
+    void getFlowFileDropRequest(String dropRequestId);
+    
+    /**
      * Gets the connections for the specified source processor.
      *
      * @param groupId group id
@@ -67,6 +74,14 @@ public interface ConnectionDAO {
     Connection createConnection(String groupId, ConnectionDTO connectionDTO);
 
     /**
+     * Creates a new flow file drop request.
+     * 
+     * @param groupId group id
+     * @param id connection id
+     */
+    void createFileFlowDropRequest(String groupId, String id);
+    
+    /**
      * Verifies the create request can be processed.
      *
      * @param groupId group id
@@ -106,4 +121,11 @@ public interface ConnectionDAO {
      * @param id The id of the connection
      */
     void deleteConnection(String groupId, String id);
+    
+    /**
+     * Deletes the specified flow file drop request.
+     * 
+     * @param dropRequestId The drop request id
+     */
+    void deleteFlowFileDropRequest(String dropRequestId);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 5fbc393..8fa9d3b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -69,6 +69,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
     }
 
     @Override
+    public void getFlowFileDropRequest(String dropRequestId) {
+        // TODO
+    }
+
+    @Override
     public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) {
         final Set<Connection> connections = new HashSet<>(getConnections(groupId));
         for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) {
@@ -294,6 +299,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
     }
 
     @Override
+    public void createFileFlowDropRequest(String groupId, String id) {
+        // TODO
+    }
+
+    @Override
     public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
         // validate the incoming request
         final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
@@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
         group.removeConnection(connection);
     }
 
+    @Override
+    public void deleteFlowFileDropRequest(String dropRequestId) {
+        // TODO
+    }
+
     /* setters */
     public void setFlowController(final FlowController flowController) {
         this.flowController = flowController;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index 3b47a8d..bab2236 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -846,6 +846,64 @@ nf.Actions = (function () {
         },
         
         /**
+         * Deletes the flow files in the specified connection.
+         * 
+         * @param {type} selection
+         */
+        deleteQueueContents: function (selection) {
+            if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) {
+                return;
+            }
+
+            // process the drop request
+            var processDropRequest = function (dropRequest, nextDelay) {
+                // see if the drop request has completed
+                if (dropRequest.finished === true) {
+                    deleteDropRequest(dropRequest);
+                } else {
+                    schedule(dropRequest, nextDelay);
+                }
+            };
+
+            // schedule for the next poll iteration
+            var schedule = function (dropRequest, delay) {
+                setTimeout(function () {
+                    $.ajax({
+                        type: 'GET',
+                        url: dropRequest.uri,
+                        dataType: 'json'
+                    }).done(function(response) {
+                        var dropRequest = response.dropRequest;
+                        processDropRequest(dropRequest, Math.min(8, delay * 2));
+                    }).fail(nf.Common.handleAjaxError);
+                }, delay * 1000);
+            };
+            
+            // delete the drop request
+            var deleteDropRequest = function (dropRequest) {
+                $.ajax({
+                    type: 'DELETE',
+                    url: dropRequest.uri,
+                    dataType: 'json'
+                }).done(function() {
+                    // drop request has been deleted
+                }).fail(nf.Common.handleAjaxError);
+            };
+            
+            // get the connection data
+            var connection = selection.datum();
+            
+            // issue the request to delete the flow files
+            $.ajax({
+                type: 'DELETE',
+                url: connection.component.uri + '/contents',
+                dataType: 'json'
+            }).done(function(response) {
+                processDropRequest(response.dropRequest, 1);
+            }).fail(nf.Common.handleAjaxError);
+        },
+        
+        /**
          * Opens the fill color dialog for the component in the specified selection.
          * 
          * @param {type} selection      The selection

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0ac7cde/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index e652dd4..58397d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -278,6 +278,15 @@ nf.ContextMenu = (function () {
     };
     
     /**
+     * Only DFMs can delete flow files from a connection.
+     * 
+     * @param {selection} selection
+     */
+    var canDeleteFlowFiles = function (selection) {
+        return nf.Common.isDFM() && isConnection(selection);
+    };
+    
+    /**
      * Determines if the components in the specified selection can be moved into a parent group.
      * 
      * @param {type} selection
@@ -373,6 +382,7 @@ nf.ContextMenu = (function () {
         {condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}},
         {condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}},
         {condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}},
+        {condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}},
         {condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}}
     ];