You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/14 17:11:56 UTC
nifi git commit: NIFI-730: Make cancel request actually cancel
Repository: nifi
Updated Branches:
refs/heads/NIFI-730 5867193bc -> a2ae99f89
NIFI-730: Make cancel request actually cancel
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a2ae99f8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a2ae99f8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a2ae99f8
Branch: refs/heads/NIFI-730
Commit: a2ae99f89965a3fe1bd6591204bdb187a377ae1c
Parents: 5867193
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 14 11:11:48 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 14 11:11:48 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/controller/DropFlowFileRequest.java | 2 +-
.../nifi/controller/StandardFlowFileQueue.java | 15 +++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 58695c2..7eea86a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -77,7 +77,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
}
@Override
- public DropFlowFileState getState() {
+ public synchronized DropFlowFileState getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 8085760..062c424 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -952,6 +952,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
QueueSize droppedSize;
try {
+ if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+ logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+ return;
+ }
+
droppedSize = drop(activeQueueRecords, requestor);
logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
} catch (final IOException ioe) {
@@ -972,6 +977,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
+ if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+ logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+ return;
+ }
+
droppedSize = drop(swapQueue, requestor);
} catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
@@ -995,6 +1005,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
List<FlowFileRecord> swappedIn = null;
try {
+ if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+ logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+ return;
+ }
+
swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
droppedSize = drop(swappedIn, requestor);
} catch (final IOException ioe) {