You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:45 UTC

[17/40] nifi git commit: NIFI-730: Make cancel request actually cancel

NIFI-730: Make cancel request actually cancel


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a2ae99f8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a2ae99f8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a2ae99f8

Branch: refs/heads/NIFI-274
Commit: a2ae99f89965a3fe1bd6591204bdb187a377ae1c
Parents: 5867193
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 14 11:11:48 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 14 11:11:48 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/DropFlowFileRequest.java  |  2 +-
 .../nifi/controller/StandardFlowFileQueue.java       | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 58695c2..7eea86a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -77,7 +77,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
     }
 
     @Override
-    public DropFlowFileState getState() {
+    public synchronized DropFlowFileState getState() {
         return state;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 8085760..062c424 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -952,6 +952,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                         QueueSize droppedSize;
                         try {
+                            if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                                logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                                return;
+                            }
+
                             droppedSize = drop(activeQueueRecords, requestor);
                             logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
                         } catch (final IOException ioe) {
@@ -972,6 +977,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                             logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
                                 requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
+                            if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                                logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                                return;
+                            }
+
                             droppedSize = drop(swapQueue, requestor);
                         } catch (final IOException ioe) {
                             logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
@@ -995,6 +1005,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                             List<FlowFileRecord> swappedIn = null;
                             try {
+                                if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                                    logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                                    return;
+                                }
+
                                 swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
                                 droppedSize = drop(swappedIn, requestor);
                             } catch (final IOException ioe) {