You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/08/07 14:23:27 UTC

[nifi] branch main updated: NIFI-7663 Added option for emptying all queues in a process group. Available from context menu.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ef566f  NIFI-7663 Added option for emptying all queues in a process group. Available from context menu.
3ef566f is described below

commit 3ef566f7da5db4a63ecff2b704ebd872dc070542
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Thu Jul 23 16:36:28 2020 +0200

    NIFI-7663 Added option for emptying all queues in a process group. Available from context menu.
    
    NIFI-7663 Minor changes (variable name refactor, javadoc, GUI message). Merging Drop All Flowfiles responses across all nodes in a cluster.
    
    NIFI-7663 Reloading the canvas after completing a Drop All Flowfiles request.
    
    NIFI-7663 Fixed typos.
    
    This closes #4425.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../http/StandardHttpResponseMapper.java           |   2 +
 .../DropAllFlowFilesRequestEndpointMerger.java     |  37 ++++
 .../java/org/apache/nifi/groups/ProcessGroup.java  |  36 ++++
 .../apache/nifi/groups/StandardProcessGroup.java   |  94 +++++++++
 .../controller/service/mock/MockProcessGroup.java  |  16 ++
 .../nifi/integration/FrameworkIntegrationTest.java |  10 +-
 .../processgroup/StandardProcessGroupIT.java       | 105 +++++++++
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  27 +++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  15 ++
 .../apache/nifi/web/api/ProcessGroupResource.java  | 235 +++++++++++++++++++++
 .../org/apache/nifi/web/dao/ProcessGroupDAO.java   |  27 +++
 .../nifi/web/dao/impl/StandardProcessGroupDAO.java |  30 +++
 .../src/main/webapp/js/nf/canvas/nf-actions.js     | 178 ++++++++++++++++
 .../main/webapp/js/nf/canvas/nf-context-menu.js    |   2 +
 14 files changed, 811 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 027ded4..c087f9c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -32,6 +32,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEnd
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.DropAllFlowFilesRequestEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
@@ -135,6 +136,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
         endpointMergers.add(new ReportingTaskEndpointMerger());
         endpointMergers.add(new ReportingTasksEndpointMerger());
         endpointMergers.add(new DropRequestEndpointMerger());
+        endpointMergers.add(new DropAllFlowFilesRequestEndpointMerger());
         endpointMergers.add(new ListFlowFilesEndpointMerger());
         endpointMergers.add(new ComponentStateEndpointMerger());
         endpointMergers.add(new BulletinBoardEndpointMerger());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java
new file mode 100644
index 0000000..8900aa4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.regex.Pattern;
+
+public class DropAllFlowFilesRequestEndpointMerger extends DropRequestEndpointMerger {
+    public static final Pattern GET_DELETE_URI = Pattern.compile("/nifi-api/process-groups/[a-f0-9\\-]{36}/empty-all-connections-requests/[a-f0-9\\-]{36}");
+    public static final Pattern POST_URI = Pattern.compile("/nifi-api/process-groups/[a-f0-9\\-]{36}/empty-all-connections-requests");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && GET_DELETE_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if (("POST".equalsIgnoreCase(method) && POST_URI.matcher(uri.getPath()).matches())) {
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 93f3e38..2656792 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.parameter.ParameterContext;
@@ -485,6 +486,41 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
     List<Connection> findAllConnections();
 
     /**
+     * Initiates a request to drop all FlowFiles in all connections under this process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link #cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+     * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
+     * @return the status of the drop request, or <code>null</code> if there is no
+     *         connection in the process group.
+     */
+    DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor);
+
+    /**
+     * Returns the current status of a Drop All FlowFiles Request that was initiated via the
+     * {@link #dropAllFlowFiles(String, String)} method with the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+     * @return the status for the request with the given identifier, or <code>null</code> if no
+     *         request status exists with that identifier
+     */
+    DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier);
+
+    /**
+     * Cancels the request to drop all FlowFiles that has the given identifier.
+     *
+     * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+     * @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no
+     *         request status exists with that identifier
+     */
+    DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier);
+
+    /**
      * @param id of the Funnel
      * @return the Funnel with the given ID, if it exists as a child or
      * descendant of this ProcessGroup. This performs a recursive search of all
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 99282c7..0bd0299 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -56,9 +56,13 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -143,6 +147,7 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -155,6 +160,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -166,6 +172,13 @@ import java.util.stream.Collectors;
 import static java.util.Objects.requireNonNull;
 
 public final class StandardProcessGroup implements ProcessGroup {
+    public static final List<DropFlowFileState> AGGREGATE_DROP_FLOW_FILE_STATE_PRECEDENCES = Arrays.asList(
+        DropFlowFileState.FAILURE,
+        DropFlowFileState.CANCELED,
+        DropFlowFileState.DROPPING_FLOWFILES,
+        DropFlowFileState.WAITING_FOR_LOCK,
+        DropFlowFileState.COMPLETE
+    );
 
     private final String id;
     private final AtomicReference<ProcessGroup> parent;
@@ -1260,6 +1273,87 @@ public final class StandardProcessGroup implements ProcessGroup {
         return findAllConnections(this);
     }
 
+    @Override
+    public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.dropFlowFiles(requestIdentifier, requestor));
+    }
+
+    @Override
+    public DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.getDropFlowFileStatus(requestIdentifier));
+    }
+
+    @Override
+    public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.cancelDropFlowFileRequest(requestIdentifier));
+    }
+
+    private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function<FlowFileQueue, DropFlowFileStatus> function) {
+        DropFlowFileStatus resultDropFlowFileStatus;
+
+        List<Connection> connections = findAllConnections(this);
+
+        DropFlowFileRequest aggregateDropFlowFileStatus = new DropFlowFileRequest(dropRequestId);
+        aggregateDropFlowFileStatus.setState(null);
+
+        AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+
+        connections.stream()
+            .map(Connection::getFlowFileQueue)
+            .map(function::apply)
+            .forEach(additionalDropFlowFileStatus -> {
+                aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus);
+                processedAtLeastOne.set(true);
+            });
+
+        if (processedAtLeastOne.get()) {
+            resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+        } else {
+            resultDropFlowFileStatus = null;
+        }
+
+        return resultDropFlowFileStatus;
+    }
+
+    private void aggregate(DropFlowFileRequest aggregateDropFlowFileStatus, DropFlowFileStatus additionalDropFlowFileStatus) {
+        QueueSize aggregateOriginalSize = aggregate(aggregateDropFlowFileStatus.getOriginalSize(), additionalDropFlowFileStatus.getOriginalSize());
+        QueueSize aggregateDroppedSize = aggregate(aggregateDropFlowFileStatus.getDroppedSize(), additionalDropFlowFileStatus.getDroppedSize());
+        QueueSize aggregateCurrentSize = aggregate(aggregateDropFlowFileStatus.getCurrentSize(), additionalDropFlowFileStatus.getCurrentSize());
+        DropFlowFileState aggregateState = aggregate(aggregateDropFlowFileStatus.getState(), additionalDropFlowFileStatus.getState());
+
+        aggregateDropFlowFileStatus.setOriginalSize(aggregateOriginalSize);
+        aggregateDropFlowFileStatus.setDroppedSize(aggregateDroppedSize);
+        aggregateDropFlowFileStatus.setCurrentSize(aggregateCurrentSize);
+        aggregateDropFlowFileStatus.setState(aggregateState);
+    }
+
+    private QueueSize aggregate(QueueSize size1, QueueSize size2) {
+        int objectsNr = Optional.ofNullable(size1)
+            .map(size -> size.getObjectCount() + size2.getObjectCount())
+            .orElse(size2.getObjectCount());
+
+        long sizeByte = Optional.ofNullable(size1)
+            .map(size -> size.getByteCount() + size2.getByteCount())
+            .orElse(size2.getByteCount());
+
+        QueueSize aggregateSize = new QueueSize(objectsNr, sizeByte);
+
+        return aggregateSize;
+    }
+
+    private DropFlowFileState aggregate(DropFlowFileState state1, DropFlowFileState state2) {
+        DropFlowFileState aggregateState = DropFlowFileState.DROPPING_FLOWFILES;
+
+        for (DropFlowFileState aggregateDropFlowFileStatePrecedence : AGGREGATE_DROP_FLOW_FILE_STATE_PRECEDENCES) {
+            if (state1 == aggregateDropFlowFileStatePrecedence || state2 == aggregateDropFlowFileStatePrecedence) {
+                aggregateState = aggregateDropFlowFileStatePrecedence;
+                break;
+            }
+        }
+
+        return aggregateState;
+    }
+
     private List<Connection> findAllConnections(final ProcessGroup group) {
         final List<Connection> connections = new ArrayList<>(group.getConnections());
         for (final ProcessGroup childGroup : group.getProcessGroups()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index c212ad1..15613af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.groups.BatchCounts;
 import org.apache.nifi.groups.DataValve;
@@ -356,6 +357,21 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
+    public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor) {
+        return null;
+    }
+
+    @Override
+    public DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier) {
+        return null;
+    }
+
+    @Override
+    public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) {
+        return null;
+    }
+
+    @Override
     public Funnel findFunnel(final String id) {
         return null;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index ca053a0..52d2b39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -468,6 +468,10 @@ public class FrameworkIntegrationTest {
     }
 
     protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
+        return connect(rootProcessGroup, source, destination, relationships);
+    }
+
+    protected final Connection connect(ProcessGroup processGroup, final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
         final String id = UUID.randomUUID().toString();
         final Connection connection = new StandardConnection.Builder(processScheduler)
             .source(source)
@@ -480,7 +484,7 @@ public class FrameworkIntegrationTest {
 
         source.addConnection(connection);
         destination.addConnection(connection);
-        rootProcessGroup.addConnection(connection);
+        processGroup.addConnection(connection);
 
         return connection;
     }
@@ -491,11 +495,11 @@ public class FrameworkIntegrationTest {
             throw new IllegalStateException("Processor is invalid: " + procNode + ": " + procNode.getValidationErrors());
         }
 
-        return rootProcessGroup.startProcessor(procNode, true);
+        return procNode.getProcessGroup().startProcessor(procNode, true);
     }
 
     protected final Future<Void> stop(final ProcessorNode procNode) {
-        return rootProcessGroup.stopProcessor(procNode);
+        return procNode.getProcessGroup().stopProcessor(procNode);
     }
 
     protected final FlowFileQueue getDestinationQueue(final ProcessorNode procNode, final Relationship relationship) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index 15dc5a8..2e0f8e4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -18,8 +18,11 @@ package org.apache.nifi.integration.processgroup;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -34,11 +37,113 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class StandardProcessGroupIT extends FrameworkIntegrationTest {
+    @Test
+    public void testDropAllFlowFilesFromOneConnection() throws Exception {
+        ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
+        ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+
+        Connection connection = connect(sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+        for (int i = 0; i < 5; i++) {
+            triggerOnce(sourceProcessGroup);
+        }
+        assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
+
+        // WHEN
+        DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+        while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+            TimeUnit.MILLISECONDS.sleep(10);
+            dropFlowFileStatus = getRootGroup().getDropAllFlowFilesStatus("requestId");
+        }
+
+        // THEN
+        assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
+    }
+
+    @Test
+    public void testDropAllFlowFilesFromOneConnectionInChildProcessGroup() throws Exception {
+        ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
+        childProcessGroup.setName("ChildProcessGroup");
+        getRootGroup().addProcessGroup(childProcessGroup);
+
+        ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
+        moveProcessor(sourceProcessGroup, childProcessGroup);
+
+        ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+        moveProcessor(destinationProcessGroup, childProcessGroup);
+
+        Connection connection = connect(childProcessGroup, sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+        for (int i = 0; i < 5; i++) {
+            triggerOnce(sourceProcessGroup);
+        }
+        assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
+
+        // WHEN
+        DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+        while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+            TimeUnit.MILLISECONDS.sleep(10);
+            dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
+        }
+
+        // THEN
+        assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
+    }
+
+    @Test
+    public void testDropAllFlowFilesFromMultipleConnections() throws Exception {
+        ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
+        childProcessGroup.setName("ChildProcessGroup");
+        getRootGroup().addProcessGroup(childProcessGroup);
+
+        ProcessorNode sourceProcessGroup1 = createGenerateProcessor(4);
+        moveProcessor(sourceProcessGroup1, childProcessGroup);
+
+        ProcessorNode sourceProcessGroup2 = createGenerateProcessor(5);
+        moveProcessor(sourceProcessGroup2, childProcessGroup);
+
+        ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+        moveProcessor(destinationProcessGroup, childProcessGroup);
+
+        Connection connection1 = connect(childProcessGroup, sourceProcessGroup1, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+        Connection connection2 = connect(childProcessGroup, sourceProcessGroup2, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+        for (int i = 0; i < 5; i++) {
+            triggerOnce(sourceProcessGroup1);
+        }
+        assertEquals(5, connection1.getFlowFileQueue().size().getObjectCount());
+
+        for (int i = 0; i < 10; i++) {
+            triggerOnce(sourceProcessGroup2);
+        }
+        assertEquals(10, connection2.getFlowFileQueue().size().getObjectCount());
+
+        // WHEN
+        DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+        while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+            TimeUnit.MILLISECONDS.sleep(10);
+            dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
+        }
+
+        // THEN
+        assertEquals(5 + 10, dropFlowFileStatus.getOriginalSize().getObjectCount());
+        assertEquals(20 + 50, dropFlowFileStatus.getOriginalSize().getByteCount());
+
+        assertEquals(0, dropFlowFileStatus.getCurrentSize().getObjectCount());
+        assertEquals(0, dropFlowFileStatus.getCurrentSize().getByteCount());
+
+        assertEquals(5 + 10, dropFlowFileStatus.getDroppedSize().getObjectCount());
+        assertEquals(20 + 50, dropFlowFileStatus.getDroppedSize().getByteCount());
+
+        assertEquals(0, connection1.getFlowFileQueue().size().getObjectCount());
+        assertEquals(0, connection2.getFlowFileQueue().size().getObjectCount());
+    }
 
     @Test
     public void testComponentsAffectedByVariableOverridden() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 4f3ffa6..19969b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1209,6 +1209,33 @@ public interface NiFiServiceFacade {
     void verifyDeleteProcessGroup(String groupId);
 
     /**
+     * Creates a request to drop flowfiles in all connections in a process group (recursively).
+     *
+     * @param processGroupId The ID of the process group
+     * @param dropRequestId The ID of the drop request
+     * @return The DropRequest
+     */
+    DropRequestDTO createDropAllFlowFilesInProcessGroup(final String processGroupId, final String dropRequestId);
+
+    /**
+     * Gets the specified request for dropping all flowfiles in a process group (recursively).
+     *
+     * @param processGroupId The ID of the process group
+     * @param dropRequestId The ID of the drop request
+     * @return The DropRequest
+     */
+    DropRequestDTO getDropAllFlowFilesRequest(final String processGroupId, final String dropRequestId);
+
+    /**
+     * Cancels/removes the specified request for dropping all flowfiles in a process group (recursively).
+     *
+     * @param processGroupId The ID of the process group
+     * @param dropRequestId The ID of the drop request
+     * @return The DropRequest
+     */
+    DropRequestDTO deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+    /**
      * Deletes the specified process group.
      *
      * @param revision Revision to compare with current base revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index da34a38..55865f1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1955,6 +1955,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public DropRequestDTO createDropAllFlowFilesInProcessGroup(final String processGroupId, final String dropRequestId) {
+        return dtoFactory.createDropRequestDTO(processGroupDAO.createDropAllFlowFilesRequest(processGroupId, dropRequestId));
+    }
+
+    @Override
+    public DropRequestDTO getDropAllFlowFilesRequest(final String processGroupId, final String dropRequestId) {
+        return dtoFactory.createDropRequestDTO(processGroupDAO.getDropAllFlowFilesRequest(processGroupId, dropRequestId));
+    }
+
+    @Override
+    public DropRequestDTO deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+        return dtoFactory.createDropRequestDTO(processGroupDAO.deleteDropAllFlowFilesRequest(processGroupId, dropRequestId));
+    }
+
+    @Override
     public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
         final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index dadfd69..6557161 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -31,6 +31,7 @@ import org.apache.nifi.authorization.AuthorizeAccess;
 import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
 import org.apache.nifi.authorization.AuthorizeParameterReference;
 import org.apache.nifi.authorization.ComponentAuthorizable;
+import org.apache.nifi.authorization.ConnectionAuthorizable;
 import org.apache.nifi.authorization.ProcessGroupAuthorizable;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.SnippetAuthorizable;
@@ -65,6 +66,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
@@ -87,6 +89,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
@@ -155,9 +158,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -1594,6 +1599,218 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
     }
 
     /**
+     * Creates a request to drop the flowfiles from all connection queues within a process group (recursively).
+     *
+     * @param httpServletRequest request
+     * @param processGroupId     The id of the process group to be removed.
+     * @return A dropRequestEntity.
+     */
+    @POST
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/empty-all-connections-requests")
+    @ApiOperation(
+        value = "Creates a request to drop all flowfiles of all connection queues in this process group.",
+        response = ProcessGroupEntity.class,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+            @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 202, message = "The request has been accepted. An HTTP response header will contain the URI where the status can be polled."),
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response createEmptyAllConnectionsRequest(
+        @Context final HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") final String processGroupId
+    ) {
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST);
+        }
+
+        final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity();
+        requestProcessGroupEntity.setId(processGroupId);
+
+        return withWriteLock(
+            serviceFacade,
+            requestProcessGroupEntity,
+            lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
+            null,
+            (processGroupEntity) -> {
+                // ensure the id is the same across the cluster
+                final String dropRequestId = generateUuid();
+
+                // submit the drop request
+                final DropRequestDTO dropRequest = serviceFacade.createDropAllFlowFilesInProcessGroup(processGroupEntity.getId(), dropRequestId);
+                dropRequest.setUri(generateResourceUri("process-groups", processGroupEntity.getId(), "empty-all-connections-requests", dropRequest.getId()));
+
+                // create the response entity
+                final DropRequestEntity entity = new DropRequestEntity();
+                entity.setDropRequest(dropRequest);
+
+                // generate the URI where the response will be
+                final URI location = URI.create(dropRequest.getUri());
+                return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+            }
+        );
+    }
+
+    /**
+     * Checks the status of an outstanding request for dropping all flowfiles within a process group.
+     *
+     * @param processGroupId  The id of the process group
+     * @param dropRequestId   The id of the drop request
+     * @return A dropRequestEntity
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/empty-all-connections-requests/{drop-request-id}")
+    @ApiOperation(
+        value = "Gets the current status of a drop all flowfiles request.",
+        response = DropRequestEntity.class,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+            @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getDropAllFlowfilesRequest(
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") final String processGroupId,
+        @ApiParam(
+            value = "The drop request id.",
+            required = true
+        )
+        @PathParam("drop-request-id") final String dropRequestId
+    ) {
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup));
+
+        // get the drop request
+        final DropRequestDTO dropRequest = serviceFacade.getDropAllFlowFilesRequest(processGroupId, dropRequestId);
+        dropRequest.setUri(generateResourceUri("process-groups", processGroupId, "empty-all-connections-requests", dropRequest.getId()));
+
+        // create the response entity
+        final DropRequestEntity entity = new DropRequestEntity();
+        entity.setDropRequest(dropRequest);
+
+        return generateOkResponse(entity).build();
+    }
+
+    /**
+     * Cancels the specified request for dropping all flowfiles within a process group.
+     *
+     * @param httpServletRequest request
+     * @param processGroupId     The process group id
+     * @param dropRequestId      The drop request id
+     * @return A dropRequestEntity
+     */
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/empty-all-connections-requests/{drop-request-id}")
+    @ApiOperation(
+        value = "Cancels and/or removes a request to drop all flowfiles.",
+        response = DropRequestEntity.class,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+            @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response removeDropRequest(
+        @Context final HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") final String processGroupId,
+        @ApiParam(
+            value = "The drop request id.",
+            required = true
+        )
+        @PathParam("drop-request-id") final String dropRequestId
+    ) {
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        }
+
+        return withWriteLock(
+            serviceFacade,
+            new DropEntity(processGroupId, dropRequestId),
+            lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
+            null,
+            (dropEntity) -> {
+                // delete the drop request
+                final DropRequestDTO dropRequest = serviceFacade.deleteDropAllFlowFilesRequest(dropEntity.getEntityId(), dropEntity.getDropRequestId());
+                dropRequest.setUri(generateResourceUri("process-groups", dropEntity.getEntityId(), "empty-all-connections-requests", dropRequest.getId()));
+
+                // create the response entity
+                final DropRequestEntity entity = new DropRequestEntity();
+                entity.setDropRequest(dropRequest);
+
+                return generateOkResponse(entity).build();
+            }
+        );
+    }
+
+    private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) {
+        final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
+
+        Queue<ProcessGroupAuthorizable> processGroupQueue = new LinkedList();
+        processGroupQueue.add(processGroup);
+
+        while (!processGroupQueue.isEmpty()) {
+            ProcessGroupAuthorizable currentProcessGroupAuthorizable = processGroupQueue.remove();
+
+            authorizeProcessGroup(currentProcessGroupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
+
+            Set<ConnectionAuthorizable> connections = currentProcessGroupAuthorizable.getEncapsulatedConnections();
+            for (ConnectionAuthorizable connection : connections) {
+                Authorizable dataAuthorizable = connection.getSourceData();
+                dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+            }
+
+            processGroupQueue.addAll(currentProcessGroupAuthorizable.getEncapsulatedProcessGroups());
+        }
+    }
+
+    /**
      * Removes the specified process group reference.
      *
      * @param httpServletRequest request
@@ -4231,4 +4448,22 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
     public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) {
         this.controllerServiceResource = controllerServiceResource;
     }
+
+    private static class DropEntity extends Entity {
+        final String entityId;
+        final String dropRequestId;
+
+        public DropEntity(String entityId, String dropRequestId) {
+            this.entityId = entityId;
+            this.dropRequestId = dropRequestId;
+        }
+
+        public String getEntityId() {
+            return entityId;
+        }
+
+        public String getDropRequestId() {
+            return dropRequestId;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index ffc3134..5fb44ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.web.dao;
 
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -188,6 +189,32 @@ public interface ProcessGroupDAO {
     void verifyDeleteFlowRegistry(String registryId);
 
     /**
+     * Creates a request to drop flowfiles in all connections (recursively).
+     *
+     * @param processGroupId process group id
+     * @param dropRequestId drop request id
+     * @return The drop request status
+     */
+    DropFlowFileStatus createDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+    /**
+     * Gets the specified request for dropping all flowfiles in all connections (recursively).
+     * @param processGroupId The id of the process group
+     * @param dropRequestId The drop request id
+     * @return The drop request status
+     */
+    DropFlowFileStatus getDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+    /**
+     * Deletes the specified request for dropping all flowfiles in all connections (recursively).
+     *
+     * @param processGroupId The id of the process group
+     * @param dropRequestId The drop request id
+     * @return The drop request status
+     */
+    DropFlowFileStatus deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+    /**
      * Deletes the specified process group.
      *
      * @param groupId The process group id
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index ba2c6c2..f088472 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web.dao.impl;
 
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
@@ -23,6 +25,7 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.FlowFileConcurrency;
@@ -45,6 +48,7 @@ import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
 
+import javax.ws.rs.WebApplicationException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -477,6 +481,32 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     }
 
     @Override
+    public DropFlowFileStatus createDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+        ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        if (user == null) {
+            throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+        }
+
+        return processGroup.dropAllFlowFiles(dropRequestId, user.getIdentity());
+    }
+
+    @Override
+    public DropFlowFileStatus getDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+        ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+        return processGroup.getDropAllFlowFilesStatus(dropRequestId);
+    }
+
+    @Override
+    public DropFlowFileStatus deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+        ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+        return processGroup.cancelDropAllFlowFiles(dropRequestId);
+    }
+
+    @Override
     public void deleteProcessGroup(String processGroupId) {
         // get the group
         ProcessGroup group = locateProcessGroup(flowController, processGroupId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index e71fde4..5d9be82 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -1270,6 +1270,184 @@
             });
         },
 
+        emptyAllQueues: function (selection) {
+            // prompt the user before emptying the queue
+            nfDialog.showYesNoDialog({
+                headerText: 'Empty All Queues',
+                dialogContent: 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.',
+                noText: 'Cancel',
+                yesText: 'Empty All',
+                yesHandler: function () {
+                    var processGroupId;
+                    if (selection.empty()) {
+                        processGroupId = nfCanvasUtils.getGroupId();
+                    } else {
+                        processGroupId = selection.datum().id;
+                    }
+
+                    var MAX_DELAY = 4;
+                    var cancelled = false;
+                    var dropRequest = null;
+                    var dropRequestTimer = null;
+
+                    // updates the progress bar
+                    var updateProgress = function (percentComplete) {
+                        // remove existing labels
+                        var progressBar = $('#drop-request-percent-complete');
+                        progressBar.find('div.progress-label').remove();
+                        progressBar.find('md-progress-linear').remove();
+
+                        // update the progress bar
+                        var label = $('<div class="progress-label"></div>').text(percentComplete + '%');
+                        (nfNgBridge.injector.get('$compile')($('<md-progress-linear ng-cloak ng-value="' + percentComplete + '" class="md-hue-2" md-mode="determinate" aria-label="Drop request percent complete"></md-progress-linear>'))(nfNgBridge.rootScope)).appendTo(progressBar);
+                        progressBar.append(label);
+                    };
+
+                    // update the button model of the drop request status dialog
+                    $('#drop-request-status-dialog').modal('setButtonModel', [{
+                        buttonText: 'Stop',
+                        color: {
+                            base: '#728E9B',
+                            hover: '#004849',
+                            text: '#ffffff'
+                        },
+                        handler: {
+                            click: function () {
+                                cancelled = true;
+
+                                // we are waiting for the next poll attempt
+                                if (dropRequestTimer !== null) {
+                                    // cancel it
+                                    clearTimeout(dropRequestTimer);
+
+                                    // cancel the drop request
+                                    completeDropRequest();
+                                }
+                            }
+                        }
+                    }]);
+
+                    // completes the drop request by removing it and showing how many flowfiles were deleted
+                    var completeDropRequest = function () {
+                        nfCanvasUtils.reload();
+
+                        // clean up as appropriate
+                        if (nfCommon.isDefinedAndNotNull(dropRequest)) {
+                            $.ajax({
+                                type: 'DELETE',
+                                url: dropRequest.uri,
+                                dataType: 'json'
+                            }).done(function (response) {
+                                // report the results of this drop request
+                                dropRequest = response.dropRequest;
+
+                                // build the results
+                                var droppedTokens = dropRequest.dropped.split(/ \/ /);
+                                var results = $('<div></div>');
+                                $('<span class="label"></span>').text(droppedTokens[0]).appendTo(results);
+                                $('<span></span>').text(' FlowFiles (' + droppedTokens[1] + ')').appendTo(results);
+
+                                // if the request did not complete, include the original
+                                if (dropRequest.percentCompleted < 100) {
+                                    var originalTokens = dropRequest.original.split(/ \/ /);
+                                    $('<span class="label"></span>').text(' out of ' + originalTokens[0]).appendTo(results);
+                                    $('<span></span>').text(' (' + originalTokens[1] + ')').appendTo(results);
+                                }
+                                $('<span></span>').text(' were removed from the queues.').appendTo(results);
+
+                                // if this request failed so the error
+                                if (nfCommon.isDefinedAndNotNull(dropRequest.failureReason)) {
+                                    $('<br/><br/><span></span>').text(dropRequest.failureReason).appendTo(results);
+                                }
+
+                                // display the results
+                                nfDialog.showOkDialog({
+                                    headerText: 'Empty All Queues',
+                                    dialogContent: results
+                                });
+                            }).always(function () {
+                                $('#drop-request-status-dialog').modal('hide');
+                            });
+                        } else {
+                            // nothing was removed
+                            nfDialog.showOkDialog({
+                                headerText: 'Empty All Queues',
+                                dialogContent: 'No FlowFiles were removed.'
+                            });
+
+                            // close the dialog
+                            $('#drop-request-status-dialog').modal('hide');
+                        }
+                    };
+
+                    // process the drop request
+                    var processDropRequest = function (delay) {
+                        // update the percent complete
+                        updateProgress(dropRequest.percentCompleted);
+
+                        // update the status of the drop request
+                        $('#drop-request-status-message').text(dropRequest.state);
+
+                        // close the dialog if the
+                        if (dropRequest.finished === true || cancelled === true) {
+                            completeDropRequest();
+                        } else {
+                            // wait delay to poll again
+                            dropRequestTimer = setTimeout(function () {
+                                // clear the drop request timer
+                                dropRequestTimer = null;
+
+                                // schedule to poll the status again in nextDelay
+                                pollDropRequest(Math.min(MAX_DELAY, delay * 2));
+                            }, delay * 1000);
+                        }
+                    };
+
+                    // schedule for the next poll iteration
+                    var pollDropRequest = function (nextDelay) {
+                        $.ajax({
+                            type: 'GET',
+                            url: dropRequest.uri,
+                            dataType: 'json'
+                        }).done(function (response) {
+                            dropRequest = response.dropRequest;
+                            processDropRequest(nextDelay);
+                        }).fail(function (xhr, status, error) {
+                            if (xhr.status === 403) {
+                                nfErrorHandler.handleAjaxError(xhr, status, error);
+                            } else {
+                                completeDropRequest()
+                            }
+                        });
+                    };
+
+                    // issue the request to delete the flow files
+                    $.ajax({
+                        type: 'POST',
+                        url: '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/empty-all-connections-requests',
+                        dataType: 'json',
+                        contentType: 'application/json'
+                    }).done(function (response) {
+                        // initialize the progress bar value
+                        updateProgress(0);
+
+                        // show the progress dialog
+                        $('#drop-request-status-dialog').modal('show');
+
+                        // process the drop request
+                        dropRequest = response.dropRequest;
+                        processDropRequest(1);
+                    }).fail(function (xhr, status, error) {
+                        if (xhr.status === 403) {
+                            nfErrorHandler.handleAjaxError(xhr, status, error);
+                        } else {
+                            completeDropRequest()
+                        }
+                    });
+                }
+            });
+        },
+
         /**
          * Lists the flow files in the specified connection.
          *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index 2699f6d..915139e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -850,6 +850,8 @@
         {id: 'paste-menu-item', condition: isPastable, menuItem: {clazz: 'fa fa-paste', text: 'Paste', action: 'paste'}},
         {separator: true},
         {id: 'empty-queue-menu-item', condition: canEmptyQueue, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty queue', action: 'emptyQueue'}},
+        {id: 'empty-all-queues-menu-item', condition: isProcessGroup, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty all queues', action: 'emptyAllQueues'}},
+        {id: 'empty-all-queues-menu-item-noselection', condition: emptySelection, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty all queues', action: 'emptyAllQueues'}},
         {id: 'delete-menu-item', condition: isDeletable, menuItem: {clazz: 'fa fa-trash', text: 'Delete', action: 'delete'}}
     ];