You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/23 16:05:28 UTC

[GitHub] [nifi] tpalfy opened a new pull request #4425: NIFI-7663 Allow user to empty all queues in a Process Group

tpalfy opened a new pull request #4425:
URL: https://github.com/apache/nifi/pull/4425


   https://issues.apache.org/jira/browse/NIFI-7663
   
   Added option for emptying all queues in a process group. Available from context menu.
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4425: NIFI-7663 Allow user to empty all queues in a Process Group

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4425:
URL: https://github.com/apache/nifi/pull/4425#discussion_r466938361



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropAllFlowfilesRequestEndpointMerger.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.regex.Pattern;
+
+public class DropAllFlowfilesRequestEndpointMerger extends DropRequestEndpointMerger {

Review comment:
       Type: FlowFiles

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
##########
@@ -1270,6 +1270,184 @@
             });
         },
 
+        emptyAllQueues: function (selection) {
+            // prompt the user before emptying the queue
+            nfDialog.showYesNoDialog({
+                headerText: 'Empty All Queues',
+                dialogContent: 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.',
+                noText: 'Cancel',
+                yesText: 'Empty All',
+                yesHandler: function () {
+                    var processGroupId;
+                    if (selection.empty()) {
+                        processGroupId = nfCanvasUtils.getGroupId();
+                    } else {
+                        processGroupId = selection.datum().id;
+                    }
+
+                    var MAX_DELAY = 4;
+                    var cancelled = false;
+                    var dropRequest = null;
+                    var dropRequestTimer = null;
+
+                    // updates the progress bar
+                    var updateProgress = function (percentComplete) {
+                        // remove existing labels
+                        var progressBar = $('#drop-request-percent-complete');
+                        progressBar.find('div.progress-label').remove();
+                        progressBar.find('md-progress-linear').remove();
+
+                        // update the progress bar
+                        var label = $('<div class="progress-label"></div>').text(percentComplete + '%');
+                        (nfNgBridge.injector.get('$compile')($('<md-progress-linear ng-cloak ng-value="' + percentComplete + '" class="md-hue-2" md-mode="determinate" aria-label="Drop request percent complete"></md-progress-linear>'))(nfNgBridge.rootScope)).appendTo(progressBar);
+                        progressBar.append(label);
+                    };
+
+                    // update the button model of the drop request status dialog
+                    $('#drop-request-status-dialog').modal('setButtonModel', [{
+                        buttonText: 'Stop',
+                        color: {
+                            base: '#728E9B',
+                            hover: '#004849',
+                            text: '#ffffff'
+                        },
+                        handler: {
+                            click: function () {
+                                cancelled = true;
+
+                                // we are waiting for the next poll attempt
+                                if (dropRequestTimer !== null) {
+                                    // cancel it
+                                    clearTimeout(dropRequestTimer);
+
+                                    // cancel the drop request
+                                    completeDropRequest();
+                                }
+                            }
+                        }
+                    }]);
+
+                    // completes the drop request by removing it and showing how many flowfiles were deleted
+                    var completeDropRequest = function () {
+                        nfCanvasUtils.reload();
+
+                        // clean up as appropriate
+                        if (nfCommon.isDefinedAndNotNull(dropRequest)) {
+                            $.ajax({
+                                type: 'DELETE',
+                                url: dropRequest.uri,
+                                dataType: 'json'
+                            }).done(function (response) {
+                                // report the results of this drop request
+                                dropRequest = response.dropRequest;
+
+                                // build the results
+                                var droppedTokens = dropRequest.dropped.split(/ \/ /);
+                                var results = $('<div></div>');
+                                $('<span class="label"></span>').text(droppedTokens[0]).appendTo(results);
+                                $('<span></span>').text(' FlowFiles (' + droppedTokens[1] + ')').appendTo(results);
+
+                                // if the request did not complete, include the original
+                                if (dropRequest.percentCompleted < 100) {
+                                    var originalTokens = dropRequest.original.split(/ \/ /);
+                                    $('<span class="label"></span>').text(' out of ' + originalTokens[0]).appendTo(results);
+                                    $('<span></span>').text(' (' + originalTokens[1] + ')').appendTo(results);
+                                }
+                                $('<span></span>').text(' were removed from the queue.').appendTo(results);

Review comment:
       Type: "... from the queues."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4425: NIFI-7663 Allow user to empty all queues in a Process Group

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4425:
URL: https://github.com/apache/nifi/pull/4425#discussion_r466332998



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link #cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
+     * @return the status of the drop request, or <code>null</code> if there is no
+     *         connection in the process group.
+     */
+    DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor);
+
+    /**
+     * Returns the current status of a Drop AlL FlowFiles Request that was initiated via the

Review comment:
       Typo: All

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link #cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
+     * @return the status of the drop request, or <code>null</code> if there is no
+     *         connection in the process group.
+     */
+    DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor);
+
+    /**
+     * Returns the current status of a Drop AlL FlowFiles Request that was initiated via the
+     * {@link #dropAllFlowFiles(String, String)} method with the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request

Review comment:
       Drop All FlowFiles Request

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
##########
@@ -1260,6 +1273,87 @@ public Connection findConnection(final String id) {
         return findAllConnections(this);
     }
 
+    @Override
+    public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String requestor) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.dropFlowFiles(requestIdentifier, requestor));
+    }
+
+    @Override
+    public DropFlowFileStatus getDropAllFlowFilesStatus(String requestIdentifier) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.getDropFlowFileStatus(requestIdentifier));
+    }
+
+    @Override
+    public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> queue.cancelDropFlowFileRequest(requestIdentifier));
+    }
+
+    private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function<FlowFileQueue, DropFlowFileStatus> function) {
+        DropFlowFileStatus resultDropFlowFileStatus;
+
+        List<Connection> connections = findAllConnections(this);
+
+        DropFlowFileRequest aggregateDropFlowFileStatus = new DropFlowFileRequest(dropRequestId);
+        aggregateDropFlowFileStatus.setState(null);
+
+        AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+
+        connections.stream()
+            .map(Connection::getFlowFileQueue)
+            .map(function::apply)
+            .forEach(additionalDropFlowFileStatus -> {
+                aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus);
+                processedAtLeastOne.set(true);
+            });
+
+        if (processedAtLeastOne.get()) {
+            resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+        } else {
+            resultDropFlowFileStatus = null;
+        }
+
+        return resultDropFlowFileStatus;
+    }
+
+    private void aggregate(DropFlowFileRequest aggregateDropFlowFileStatus, DropFlowFileStatus additionalDropFlowFileStatus) {
+        QueueSize aggregateOriginalSize = aggregate(aggregateDropFlowFileStatus.getOriginalSize(), additionalDropFlowFileStatus.getOriginalSize());
+        QueueSize aggregateDroppedSize = aggregate(aggregateDropFlowFileStatus.getDroppedSize(), additionalDropFlowFileStatus.getDroppedSize());
+        QueueSize aggregateCurrentSize = aggregate(aggregateDropFlowFileStatus.getCurrentSize(), additionalDropFlowFileStatus.getCurrentSize());
+        DropFlowFileState aggregateState = aggregate(aggregateDropFlowFileStatus.getState(), additionalDropFlowFileStatus.getState());
+
+        aggregateDropFlowFileStatus.setOriginalSize(aggregateOriginalSize);
+        aggregateDropFlowFileStatus.setDroppedSize(aggregateDroppedSize);
+        aggregateDropFlowFileStatus.setCurrentSize(aggregateCurrentSize);
+        aggregateDropFlowFileStatus.setState(aggregateState);
+    }
+
+    private QueueSize aggregate(QueueSize size1, QueueSize size2) {
+        int objectsNr = Optional.ofNullable(size1)
+            .map(size -> size1.getObjectCount() + size2.getObjectCount())

Review comment:
       `.map(size -> size.getObjectCount() + size2.getObjectCount())` would be more natural / readable IMO.
   Otherwise the wrapped object and the function parameter are not used.
   Also for line 1336.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link #cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request

Review comment:
       Drop All FlowFiles Request




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4425: NIFI-7663 Allow user to empty all queues in a Process Group

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4425:
URL: https://github.com/apache/nifi/pull/4425


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org