You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:35 UTC

[11/30] nifi git commit: NIFI-730: Updated queue sizes appropriately during dropping of flowfiles

NIFI-730: Updated queue sizes appropriately during dropping of flowfiles


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72ff2a25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72ff2a25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72ff2a25

Branch: refs/heads/master
Commit: 72ff2a25d5455442d5aec27835f953e6ab36eca3
Parents: 4b41aaa
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 15:46:56 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 15:46:56 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/controller/StandardFlowFileQueue.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/72ff2a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index b699ceb..82c1c7e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -924,11 +924,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 writeLock.lock();
                 try {
                     dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
+                    dropRequest.setOriginalSize(getQueueSize());
 
                     try {
                         final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
                         QueueSize droppedSize = drop(activeQueueRecords, requestor);
                         activeQueue.clear();
+                        activeQueueContentSize = 0;
+                        activeQueueSizeRef.set(0);
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
@@ -936,6 +939,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         swapQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                        swapMode = false;
+                        swappedContentSize -= droppedSize.getByteCount();
+                        swappedRecordCount -= droppedSize.getObjectCount();
 
                         final Iterator<String> swapLocationItr = swapLocations.iterator();
                         while (swapLocationItr.hasNext()) {
@@ -943,12 +949,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                             final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
                             try {
                                 droppedSize = drop(swappedIn, requestor);
-                                dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
                             } catch (final Exception e) {
                                 activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
                                 throw e;
                             }
 
+                            dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                            swappedContentSize -= droppedSize.getByteCount();
+                            swappedRecordCount -= droppedSize.getObjectCount();
                             dropRequest.setCurrentSize(getQueueSize());
                             swapLocationItr.remove();
                         }