You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/08/07 14:23:27 UTC
[nifi] branch main updated: NIFI-7663 Added option for emptying all
queues in a process group. Available from context menu.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3ef566f NIFI-7663 Added option for emptying all queues in a process group. Available from context menu.
3ef566f is described below
commit 3ef566f7da5db4a63ecff2b704ebd872dc070542
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Thu Jul 23 16:36:28 2020 +0200
NIFI-7663 Added option for emptying all queues in a process group. Available from context menu.
NIFI-7663 Minor changes (variable name refactor, javadoc, GUI message). Merging Drop All Flowfiles responses across all nodes in a cluster.
NIFI-7663 Reloading the canvas after completing a Drop All Flowfiles request.
NIFI-7663 Fixed typos.
This closes #4425.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../http/StandardHttpResponseMapper.java | 2 +
.../DropAllFlowFilesRequestEndpointMerger.java | 37 ++++
.../java/org/apache/nifi/groups/ProcessGroup.java | 36 ++++
.../apache/nifi/groups/StandardProcessGroup.java | 94 +++++++++
.../controller/service/mock/MockProcessGroup.java | 16 ++
.../nifi/integration/FrameworkIntegrationTest.java | 10 +-
.../processgroup/StandardProcessGroupIT.java | 105 +++++++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 27 +++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 15 ++
.../apache/nifi/web/api/ProcessGroupResource.java | 235 +++++++++++++++++++++
.../org/apache/nifi/web/dao/ProcessGroupDAO.java | 27 +++
.../nifi/web/dao/impl/StandardProcessGroupDAO.java | 30 +++
.../src/main/webapp/js/nf/canvas/nf-actions.js | 178 ++++++++++++++++
.../main/webapp/js/nf/canvas/nf-context-menu.js | 2 +
14 files changed, 811 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 027ded4..c087f9c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -32,6 +32,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEnd
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.DropAllFlowFilesRequestEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
@@ -135,6 +136,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
endpointMergers.add(new ReportingTaskEndpointMerger());
endpointMergers.add(new ReportingTasksEndpointMerger());
endpointMergers.add(new DropRequestEndpointMerger());
+ endpointMergers.add(new DropAllFlowFilesRequestEndpointMerger());
endpointMergers.add(new ListFlowFilesEndpointMerger());
endpointMergers.add(new ComponentStateEndpointMerger());
endpointMergers.add(new BulletinBoardEndpointMerger());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java
new file mode 100644
index 0000000..8900aa4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowFilesRequestEndpointMerger.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.regex.Pattern;
+
+public class DropAllFlowFilesRequestEndpointMerger extends DropRequestEndpointMerger {
+ public static final Pattern GET_DELETE_URI = Pattern.compile("/nifi-api/process-groups/[a-f0-9\\-]{36}/empty-all-connections-requests/[a-f0-9\\-]{36}");
+ public static final Pattern POST_URI = Pattern.compile("/nifi-api/process-groups/[a-f0-9\\-]{36}/empty-all-connections-requests");
+
+ @Override
+ public boolean canHandle(URI uri, String method) {
+ if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && GET_DELETE_URI.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if (("POST".equalsIgnoreCase(method) && POST_URI.matcher(uri.getPath()).matches())) {
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 93f3e38..2656792 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parameter.ParameterContext;
@@ -485,6 +486,41 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
List<Connection> findAllConnections();
/**
+ * Initiates a request to drop all FlowFiles in all connections under this process group (recursively).
+ * This method returns a DropFlowFileStatus that can be used to determine the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+ * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link #cancelDropAllFlowFiles(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+ * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+ * included in the Provenance Events that are generated.
+ *
+ * @return the status of the drop request, or <code>null</code> if there is no
+ * connection in the process group.
+ */
+ DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor);
+
+ /**
+ * Returns the current status of a Drop All FlowFiles Request that was initiated via the
+ * {@link #dropAllFlowFiles(String, String)} method with the given identifier
+ *
+ * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+ * @return the status for the request with the given identifier, or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier);
+
+ /**
+ * Cancels the request to drop all FlowFiles that has the given identifier.
+ *
+ * @param requestIdentifier the identifier of the Drop All FlowFiles Request
+ * @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier);
+
+ /**
* @param id of the Funnel
* @return the Funnel with the given ID, if it exists as a child or
* descendant of this ProcessGroup. This performs a recursive search of all
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 99282c7..0bd0299 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -56,9 +56,13 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -143,6 +147,7 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -155,6 +160,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -166,6 +172,13 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public final class StandardProcessGroup implements ProcessGroup {
+ public static final List<DropFlowFileState> AGGREGATE_DROP_FLOW_FILE_STATE_PRECEDENCES = Arrays.asList(
+ DropFlowFileState.FAILURE,
+ DropFlowFileState.CANCELED,
+ DropFlowFileState.DROPPING_FLOWFILES,
+ DropFlowFileState.WAITING_FOR_LOCK,
+ DropFlowFileState.COMPLETE
+ );
private final String id;
private final AtomicReference<ProcessGroup> parent;
@@ -1260,6 +1273,87 @@ public final class StandardProcessGroup implements ProcessGroup {
return findAllConnections(this);
}
+ @Override
+ public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor) {
+ return handleDropAllFlowFiles(requestIdentifier, queue -> queue.dropFlowFiles(requestIdentifier, requestor));
+ }
+
+ @Override
+ public DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier) {
+ return handleDropAllFlowFiles(requestIdentifier, queue -> queue.getDropFlowFileStatus(requestIdentifier));
+ }
+
+ @Override
+ public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) {
+ return handleDropAllFlowFiles(requestIdentifier, queue -> queue.cancelDropFlowFileRequest(requestIdentifier));
+ }
+
+ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function<FlowFileQueue, DropFlowFileStatus> function) {
+ DropFlowFileStatus resultDropFlowFileStatus;
+
+ List<Connection> connections = findAllConnections(this);
+
+ DropFlowFileRequest aggregateDropFlowFileStatus = new DropFlowFileRequest(dropRequestId);
+ aggregateDropFlowFileStatus.setState(null);
+
+ AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+
+ connections.stream()
+ .map(Connection::getFlowFileQueue)
+ .map(function::apply)
+ .forEach(additionalDropFlowFileStatus -> {
+ aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus);
+ processedAtLeastOne.set(true);
+ });
+
+ if (processedAtLeastOne.get()) {
+ resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+ } else {
+ resultDropFlowFileStatus = null;
+ }
+
+ return resultDropFlowFileStatus;
+ }
+
+ private void aggregate(DropFlowFileRequest aggregateDropFlowFileStatus, DropFlowFileStatus additionalDropFlowFileStatus) {
+ QueueSize aggregateOriginalSize = aggregate(aggregateDropFlowFileStatus.getOriginalSize(), additionalDropFlowFileStatus.getOriginalSize());
+ QueueSize aggregateDroppedSize = aggregate(aggregateDropFlowFileStatus.getDroppedSize(), additionalDropFlowFileStatus.getDroppedSize());
+ QueueSize aggregateCurrentSize = aggregate(aggregateDropFlowFileStatus.getCurrentSize(), additionalDropFlowFileStatus.getCurrentSize());
+ DropFlowFileState aggregateState = aggregate(aggregateDropFlowFileStatus.getState(), additionalDropFlowFileStatus.getState());
+
+ aggregateDropFlowFileStatus.setOriginalSize(aggregateOriginalSize);
+ aggregateDropFlowFileStatus.setDroppedSize(aggregateDroppedSize);
+ aggregateDropFlowFileStatus.setCurrentSize(aggregateCurrentSize);
+ aggregateDropFlowFileStatus.setState(aggregateState);
+ }
+
+ private QueueSize aggregate(QueueSize size1, QueueSize size2) {
+ int objectsNr = Optional.ofNullable(size1)
+ .map(size -> size.getObjectCount() + size2.getObjectCount())
+ .orElse(size2.getObjectCount());
+
+ long sizeByte = Optional.ofNullable(size1)
+ .map(size -> size.getByteCount() + size2.getByteCount())
+ .orElse(size2.getByteCount());
+
+ QueueSize aggregateSize = new QueueSize(objectsNr, sizeByte);
+
+ return aggregateSize;
+ }
+
+ private DropFlowFileState aggregate(DropFlowFileState state1, DropFlowFileState state2) {
+ DropFlowFileState aggregateState = DropFlowFileState.DROPPING_FLOWFILES;
+
+ for (DropFlowFileState aggregateDropFlowFileStatePrecedence : AGGREGATE_DROP_FLOW_FILE_STATE_PRECEDENCES) {
+ if (state1 == aggregateDropFlowFileStatePrecedence || state2 == aggregateDropFlowFileStatePrecedence) {
+ aggregateState = aggregateDropFlowFileStatePrecedence;
+ break;
+ }
+ }
+
+ return aggregateState;
+ }
+
private List<Connection> findAllConnections(final ProcessGroup group) {
final List<Connection> connections = new ArrayList<>(group.getConnections());
for (final ProcessGroup childGroup : group.getProcessGroups()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index c212ad1..15613af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.BatchCounts;
import org.apache.nifi.groups.DataValve;
@@ -356,6 +357,21 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
+ public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor) {
+ return null;
+ }
+
+ @Override
+ public DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier) {
+ return null;
+ }
+
+ @Override
+ public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) {
+ return null;
+ }
+
+ @Override
public Funnel findFunnel(final String id) {
return null;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index ca053a0..52d2b39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -468,6 +468,10 @@ public class FrameworkIntegrationTest {
}
protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
+ return connect(rootProcessGroup, source, destination, relationships);
+ }
+
+ protected final Connection connect(ProcessGroup processGroup, final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
final String id = UUID.randomUUID().toString();
final Connection connection = new StandardConnection.Builder(processScheduler)
.source(source)
@@ -480,7 +484,7 @@ public class FrameworkIntegrationTest {
source.addConnection(connection);
destination.addConnection(connection);
- rootProcessGroup.addConnection(connection);
+ processGroup.addConnection(connection);
return connection;
}
@@ -491,11 +495,11 @@ public class FrameworkIntegrationTest {
throw new IllegalStateException("Processor is invalid: " + procNode + ": " + procNode.getValidationErrors());
}
- return rootProcessGroup.startProcessor(procNode, true);
+ return procNode.getProcessGroup().startProcessor(procNode, true);
}
protected final Future<Void> stop(final ProcessorNode procNode) {
- return rootProcessGroup.stopProcessor(procNode);
+ return procNode.getProcessGroup().stopProcessor(procNode);
}
protected final FlowFileQueue getDestinationQueue(final ProcessorNode procNode, final Relationship relationship) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index 15dc5a8..2e0f8e4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -18,8 +18,11 @@ package org.apache.nifi.integration.processgroup;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -34,11 +37,113 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
+ @Test
+ public void testDropAllFlowFilesFromOneConnection() throws Exception {
+ ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
+ ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+
+ Connection connection = connect(sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+ for (int i = 0; i < 5; i++) {
+ triggerOnce(sourceProcessGroup);
+ }
+ assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
+
+ // WHEN
+ DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+ while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ dropFlowFileStatus = getRootGroup().getDropAllFlowFilesStatus("requestId");
+ }
+
+ // THEN
+ assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
+ }
+
+ @Test
+ public void testDropAllFlowFilesFromOneConnectionInChildProcessGroup() throws Exception {
+ ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
+ childProcessGroup.setName("ChildProcessGroup");
+ getRootGroup().addProcessGroup(childProcessGroup);
+
+ ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
+ moveProcessor(sourceProcessGroup, childProcessGroup);
+
+ ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+ moveProcessor(destinationProcessGroup, childProcessGroup);
+
+ Connection connection = connect(childProcessGroup, sourceProcessGroup, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+ for (int i = 0; i < 5; i++) {
+ triggerOnce(sourceProcessGroup);
+ }
+ assertEquals(5, connection.getFlowFileQueue().size().getObjectCount());
+
+ // WHEN
+ DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+ while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
+ }
+
+ // THEN
+ assertEquals(0, connection.getFlowFileQueue().size().getObjectCount());
+ }
+
+ @Test
+ public void testDropAllFlowFilesFromMultipleConnections() throws Exception {
+ ProcessGroup childProcessGroup = getFlowController().getFlowManager().createProcessGroup("childProcessGroup");
+ childProcessGroup.setName("ChildProcessGroup");
+ getRootGroup().addProcessGroup(childProcessGroup);
+
+ ProcessorNode sourceProcessGroup1 = createGenerateProcessor(4);
+ moveProcessor(sourceProcessGroup1, childProcessGroup);
+
+ ProcessorNode sourceProcessGroup2 = createGenerateProcessor(5);
+ moveProcessor(sourceProcessGroup2, childProcessGroup);
+
+ ProcessorNode destinationProcessGroup = createProcessorNode((context, session) -> {});
+ moveProcessor(destinationProcessGroup, childProcessGroup);
+
+ Connection connection1 = connect(childProcessGroup, sourceProcessGroup1, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+ Connection connection2 = connect(childProcessGroup, sourceProcessGroup2, destinationProcessGroup, Collections.singleton(REL_SUCCESS));
+
+ for (int i = 0; i < 5; i++) {
+ triggerOnce(sourceProcessGroup1);
+ }
+ assertEquals(5, connection1.getFlowFileQueue().size().getObjectCount());
+
+ for (int i = 0; i < 10; i++) {
+ triggerOnce(sourceProcessGroup2);
+ }
+ assertEquals(10, connection2.getFlowFileQueue().size().getObjectCount());
+
+ // WHEN
+ DropFlowFileStatus dropFlowFileStatus = getRootGroup().dropAllFlowFiles("requestId", "unimportant");
+ while (dropFlowFileStatus.getState() != DropFlowFileState.COMPLETE) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ dropFlowFileStatus = childProcessGroup.getDropAllFlowFilesStatus("requestId");
+ }
+
+ // THEN
+ assertEquals(5 + 10, dropFlowFileStatus.getOriginalSize().getObjectCount());
+ assertEquals(20 + 50, dropFlowFileStatus.getOriginalSize().getByteCount());
+
+ assertEquals(0, dropFlowFileStatus.getCurrentSize().getObjectCount());
+ assertEquals(0, dropFlowFileStatus.getCurrentSize().getByteCount());
+
+ assertEquals(5 + 10, dropFlowFileStatus.getDroppedSize().getObjectCount());
+ assertEquals(20 + 50, dropFlowFileStatus.getDroppedSize().getByteCount());
+
+ assertEquals(0, connection1.getFlowFileQueue().size().getObjectCount());
+ assertEquals(0, connection2.getFlowFileQueue().size().getObjectCount());
+ }
@Test
public void testComponentsAffectedByVariableOverridden() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 4f3ffa6..19969b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1209,6 +1209,33 @@ public interface NiFiServiceFacade {
void verifyDeleteProcessGroup(String groupId);
/**
+ * Creates a request to drop flowfiles in all connections in a process group (recursively).
+ *
+ * @param processGroupId The ID of the process group
+ * @param dropRequestId The ID of the drop request
+ * @return The DropRequest
+ */
+ DropRequestDTO createDropAllFlowFilesInProcessGroup(final String processGroupId, final String dropRequestId);
+
+ /**
+ * Gets the specified request for dropping all flowfiles in a process group (recursively).
+ *
+ * @param processGroupId The ID of the process group
+ * @param dropRequestId The ID of the drop request
+ * @return The DropRequest
+ */
+ DropRequestDTO getDropAllFlowFilesRequest(final String processGroupId, final String dropRequestId);
+
+ /**
+ * Cancels/removes the specified request for dropping all flowfiles in a process group (recursively).
+ *
+ * @param processGroupId The ID of the process group
+ * @param dropRequestId The ID of the drop request
+ * @return The DropRequest
+ */
+ DropRequestDTO deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+ /**
* Deletes the specified process group.
*
* @param revision Revision to compare with current base revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index da34a38..55865f1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1955,6 +1955,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public DropRequestDTO createDropAllFlowFilesInProcessGroup(final String processGroupId, final String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(processGroupDAO.createDropAllFlowFilesRequest(processGroupId, dropRequestId));
+ }
+
+ @Override
+ public DropRequestDTO getDropAllFlowFilesRequest(final String processGroupId, final String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(processGroupDAO.getDropAllFlowFilesRequest(processGroupId, dropRequestId));
+ }
+
+ @Override
+ public DropRequestDTO deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(processGroupDAO.deleteDropAllFlowFilesRequest(processGroupId, dropRequestId));
+ }
+
+ @Override
public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index dadfd69..6557161 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -31,6 +31,7 @@ import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.ComponentAuthorizable;
+import org.apache.nifi.authorization.ConnectionAuthorizable;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.SnippetAuthorizable;
@@ -65,6 +66,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
@@ -87,6 +89,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
@@ -155,9 +158,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -1594,6 +1599,218 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
}
/**
+ * Creates a request to drop the flowfiles from all connection queues within a process group (recursively).
+ *
+ * @param httpServletRequest request
+ * @param processGroupId The id of the process group to be removed.
+ * @return A dropRequestEntity.
+ */
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/empty-all-connections-requests")
+ @ApiOperation(
+ value = "Creates a request to drop all flowfiles of all connection queues in this process group.",
+ response = ProcessGroupEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+ @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. An HTTP response header will contain the URI where the status can be polled."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response createEmptyAllConnectionsRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") final String processGroupId
+ ) {
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST);
+ }
+
+ final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity();
+ requestProcessGroupEntity.setId(processGroupId);
+
+ return withWriteLock(
+ serviceFacade,
+ requestProcessGroupEntity,
+ lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
+ null,
+ (processGroupEntity) -> {
+ // ensure the id is the same across the cluster
+ final String dropRequestId = generateUuid();
+
+ // submit the drop request
+ final DropRequestDTO dropRequest = serviceFacade.createDropAllFlowFilesInProcessGroup(processGroupEntity.getId(), dropRequestId);
+ dropRequest.setUri(generateResourceUri("process-groups", processGroupEntity.getId(), "empty-all-connections-requests", dropRequest.getId()));
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setDropRequest(dropRequest);
+
+ // generate the URI where the response will be
+ final URI location = URI.create(dropRequest.getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+ }
+ );
+ }
+
+ /**
+ * Checks the status of an outstanding request for dropping all flowfiles within a process group.
+ *
+ * @param processGroupId The id of the process group
+ * @param dropRequestId The id of the drop request
+ * @return A dropRequestEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/empty-all-connections-requests/{drop-request-id}")
+ @ApiOperation(
+ value = "Gets the current status of a drop all flowfiles request.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+ @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getDropAllFlowfilesRequest(
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") final String processGroupId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") final String dropRequestId
+ ) {
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup));
+
+ // get the drop request
+ final DropRequestDTO dropRequest = serviceFacade.getDropAllFlowFilesRequest(processGroupId, dropRequestId);
+ dropRequest.setUri(generateResourceUri("process-groups", processGroupId, "empty-all-connections-requests", dropRequest.getId()));
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setDropRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Cancels the specified request for dropping all flowfiles within a process group.
+ *
+ * @param httpServletRequest request
+ * @param processGroupId The process group id
+ * @param dropRequestId The drop request id
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/empty-all-connections-requests/{drop-request-id}")
+ @ApiOperation(
+ value = "Cancels and/or removes a request to drop all flowfiles.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
+ @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response removeDropRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") final String processGroupId,
+ @ApiParam(
+ value = "The drop request id.",
+ required = true
+ )
+ @PathParam("drop-request-id") final String dropRequestId
+ ) {
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ return withWriteLock(
+ serviceFacade,
+ new DropEntity(processGroupId, dropRequestId),
+ lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
+ null,
+ (dropEntity) -> {
+ // delete the drop request
+ final DropRequestDTO dropRequest = serviceFacade.deleteDropAllFlowFilesRequest(dropEntity.getEntityId(), dropEntity.getDropRequestId());
+ dropRequest.setUri(generateResourceUri("process-groups", dropEntity.getEntityId(), "empty-all-connections-requests", dropRequest.getId()));
+
+ // create the response entity
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setDropRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
+ private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) {
+ final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
+
+ Queue<ProcessGroupAuthorizable> processGroupQueue = new LinkedList();
+ processGroupQueue.add(processGroup);
+
+ while (!processGroupQueue.isEmpty()) {
+ ProcessGroupAuthorizable currentProcessGroupAuthorizable = processGroupQueue.remove();
+
+ authorizeProcessGroup(currentProcessGroupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
+
+ Set<ConnectionAuthorizable> connections = currentProcessGroupAuthorizable.getEncapsulatedConnections();
+ for (ConnectionAuthorizable connection : connections) {
+ Authorizable dataAuthorizable = connection.getSourceData();
+ dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ }
+
+ processGroupQueue.addAll(currentProcessGroupAuthorizable.getEncapsulatedProcessGroups());
+ }
+ }
+
+ /**
* Removes the specified process group reference.
*
* @param httpServletRequest request
@@ -4231,4 +4448,22 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) {
this.controllerServiceResource = controllerServiceResource;
}
+
+ private static class DropEntity extends Entity {
+ final String entityId;
+ final String dropRequestId;
+
+ public DropEntity(String entityId, String dropRequestId) {
+ this.entityId = entityId;
+ this.dropRequestId = dropRequestId;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public String getDropRequestId() {
+ return dropRequestId;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index ffc3134..5fb44ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -17,6 +17,7 @@
package org.apache.nifi.web.dao;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -188,6 +189,32 @@ public interface ProcessGroupDAO {
void verifyDeleteFlowRegistry(String registryId);
/**
+ * Creates a request to drop flowfiles in all connections (recursively).
+ *
+ * @param processGroupId process group id
+ * @param dropRequestId drop request id
+ * @return The drop request status
+ */
+ DropFlowFileStatus createDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+ /**
+ * Gets the specified request for dropping all flowfiles in all connections (recursively).
+ * @param processGroupId The id of the process group
+ * @param dropRequestId The drop request id
+ * @return The drop request status
+ */
+ DropFlowFileStatus getDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+ /**
+ * Deletes the specified request for dropping all flowfiles in all connections (recursively).
+ *
+ * @param processGroupId The id of the process group
+ * @param dropRequestId The drop request id
+ * @return The drop request status
+ */
+ DropFlowFileStatus deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId);
+
+ /**
* Deletes the specified process group.
*
* @param groupId The process group id
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index ba2c6c2..f088472 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
@@ -23,6 +25,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.FlowFileConcurrency;
@@ -45,6 +48,7 @@ import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.dao.ProcessGroupDAO;
+import javax.ws.rs.WebApplicationException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -477,6 +481,32 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
+ public DropFlowFileStatus createDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+ ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ return processGroup.dropAllFlowFiles(dropRequestId, user.getIdentity());
+ }
+
+ @Override
+ public DropFlowFileStatus getDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+ ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+ return processGroup.getDropAllFlowFilesStatus(dropRequestId);
+ }
+
+ @Override
+ public DropFlowFileStatus deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
+ ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
+
+ return processGroup.cancelDropAllFlowFiles(dropRequestId);
+ }
+
+ @Override
public void deleteProcessGroup(String processGroupId) {
// get the group
ProcessGroup group = locateProcessGroup(flowController, processGroupId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index e71fde4..5d9be82 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -1270,6 +1270,184 @@
});
},
+ emptyAllQueues: function (selection) {
+ // prompt the user before emptying the queue
+ nfDialog.showYesNoDialog({
+ headerText: 'Empty All Queues',
+ dialogContent: 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.',
+ noText: 'Cancel',
+ yesText: 'Empty All',
+ yesHandler: function () {
+ var processGroupId;
+ if (selection.empty()) {
+ processGroupId = nfCanvasUtils.getGroupId();
+ } else {
+ processGroupId = selection.datum().id;
+ }
+
+ var MAX_DELAY = 4;
+ var cancelled = false;
+ var dropRequest = null;
+ var dropRequestTimer = null;
+
+ // updates the progress bar
+ var updateProgress = function (percentComplete) {
+ // remove existing labels
+ var progressBar = $('#drop-request-percent-complete');
+ progressBar.find('div.progress-label').remove();
+ progressBar.find('md-progress-linear').remove();
+
+ // update the progress bar
+ var label = $('<div class="progress-label"></div>').text(percentComplete + '%');
+ (nfNgBridge.injector.get('$compile')($('<md-progress-linear ng-cloak ng-value="' + percentComplete + '" class="md-hue-2" md-mode="determinate" aria-label="Drop request percent complete"></md-progress-linear>'))(nfNgBridge.rootScope)).appendTo(progressBar);
+ progressBar.append(label);
+ };
+
+ // update the button model of the drop request status dialog
+ $('#drop-request-status-dialog').modal('setButtonModel', [{
+ buttonText: 'Stop',
+ color: {
+ base: '#728E9B',
+ hover: '#004849',
+ text: '#ffffff'
+ },
+ handler: {
+ click: function () {
+ cancelled = true;
+
+ // we are waiting for the next poll attempt
+ if (dropRequestTimer !== null) {
+ // cancel it
+ clearTimeout(dropRequestTimer);
+
+ // cancel the drop request
+ completeDropRequest();
+ }
+ }
+ }
+ }]);
+
+ // completes the drop request by removing it and showing how many flowfiles were deleted
+ var completeDropRequest = function () {
+ nfCanvasUtils.reload();
+
+ // clean up as appropriate
+ if (nfCommon.isDefinedAndNotNull(dropRequest)) {
+ $.ajax({
+ type: 'DELETE',
+ url: dropRequest.uri,
+ dataType: 'json'
+ }).done(function (response) {
+ // report the results of this drop request
+ dropRequest = response.dropRequest;
+
+ // build the results
+ var droppedTokens = dropRequest.dropped.split(/ \/ /);
+ var results = $('<div></div>');
+ $('<span class="label"></span>').text(droppedTokens[0]).appendTo(results);
+ $('<span></span>').text(' FlowFiles (' + droppedTokens[1] + ')').appendTo(results);
+
+ // if the request did not complete, include the original
+ if (dropRequest.percentCompleted < 100) {
+ var originalTokens = dropRequest.original.split(/ \/ /);
+ $('<span class="label"></span>').text(' out of ' + originalTokens[0]).appendTo(results);
+ $('<span></span>').text(' (' + originalTokens[1] + ')').appendTo(results);
+ }
+ $('<span></span>').text(' were removed from the queues.').appendTo(results);
+
+ // if this request failed so the error
+ if (nfCommon.isDefinedAndNotNull(dropRequest.failureReason)) {
+ $('<br/><br/><span></span>').text(dropRequest.failureReason).appendTo(results);
+ }
+
+ // display the results
+ nfDialog.showOkDialog({
+ headerText: 'Empty All Queues',
+ dialogContent: results
+ });
+ }).always(function () {
+ $('#drop-request-status-dialog').modal('hide');
+ });
+ } else {
+ // nothing was removed
+ nfDialog.showOkDialog({
+ headerText: 'Empty All Queues',
+ dialogContent: 'No FlowFiles were removed.'
+ });
+
+ // close the dialog
+ $('#drop-request-status-dialog').modal('hide');
+ }
+ };
+
+ // process the drop request
+ var processDropRequest = function (delay) {
+ // update the percent complete
+ updateProgress(dropRequest.percentCompleted);
+
+ // update the status of the drop request
+ $('#drop-request-status-message').text(dropRequest.state);
+
+ // close the dialog if the
+ if (dropRequest.finished === true || cancelled === true) {
+ completeDropRequest();
+ } else {
+ // wait delay to poll again
+ dropRequestTimer = setTimeout(function () {
+ // clear the drop request timer
+ dropRequestTimer = null;
+
+ // schedule to poll the status again in nextDelay
+ pollDropRequest(Math.min(MAX_DELAY, delay * 2));
+ }, delay * 1000);
+ }
+ };
+
+ // schedule for the next poll iteration
+ var pollDropRequest = function (nextDelay) {
+ $.ajax({
+ type: 'GET',
+ url: dropRequest.uri,
+ dataType: 'json'
+ }).done(function (response) {
+ dropRequest = response.dropRequest;
+ processDropRequest(nextDelay);
+ }).fail(function (xhr, status, error) {
+ if (xhr.status === 403) {
+ nfErrorHandler.handleAjaxError(xhr, status, error);
+ } else {
+ completeDropRequest()
+ }
+ });
+ };
+
+ // issue the request to delete the flow files
+ $.ajax({
+ type: 'POST',
+ url: '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/empty-all-connections-requests',
+ dataType: 'json',
+ contentType: 'application/json'
+ }).done(function (response) {
+ // initialize the progress bar value
+ updateProgress(0);
+
+ // show the progress dialog
+ $('#drop-request-status-dialog').modal('show');
+
+ // process the drop request
+ dropRequest = response.dropRequest;
+ processDropRequest(1);
+ }).fail(function (xhr, status, error) {
+ if (xhr.status === 403) {
+ nfErrorHandler.handleAjaxError(xhr, status, error);
+ } else {
+ completeDropRequest()
+ }
+ });
+ }
+ });
+ },
+
/**
* Lists the flow files in the specified connection.
*
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index 2699f6d..915139e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -850,6 +850,8 @@
{id: 'paste-menu-item', condition: isPastable, menuItem: {clazz: 'fa fa-paste', text: 'Paste', action: 'paste'}},
{separator: true},
{id: 'empty-queue-menu-item', condition: canEmptyQueue, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty queue', action: 'emptyQueue'}},
+ {id: 'empty-all-queues-menu-item', condition: isProcessGroup, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty all queues', action: 'emptyAllQueues'}},
+ {id: 'empty-all-queues-menu-item-noselection', condition: emptySelection, menuItem: {clazz: 'fa fa-minus-circle', text: 'Empty all queues', action: 'emptyAllQueues'}},
{id: 'delete-menu-item', condition: isDeletable, menuItem: {clazz: 'fa fa-trash', text: 'Delete', action: 'delete'}}
];