You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:35 UTC
[11/30] nifi git commit: NIFI-730: Updated queue sizes appropriately
during dropping of flowfiles
NIFI-730: Updated queue sizes appropriately during dropping of flowfiles
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72ff2a25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72ff2a25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72ff2a25
Branch: refs/heads/master
Commit: 72ff2a25d5455442d5aec27835f953e6ab36eca3
Parents: 4b41aaa
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 15:46:56 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 15:46:56 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/controller/StandardFlowFileQueue.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/72ff2a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index b699ceb..82c1c7e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -924,11 +924,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.lock();
try {
dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
+ dropRequest.setOriginalSize(getQueueSize());
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
QueueSize droppedSize = drop(activeQueueRecords, requestor);
activeQueue.clear();
+ activeQueueContentSize = 0;
+ activeQueueSizeRef.set(0);
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -936,6 +939,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+ swapMode = false;
+ swappedContentSize -= droppedSize.getByteCount();
+ swappedRecordCount -= droppedSize.getObjectCount();
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
@@ -943,12 +949,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
try {
droppedSize = drop(swappedIn, requestor);
- dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
}
+ dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+ swappedContentSize -= droppedSize.getByteCount();
+ swappedRecordCount -= droppedSize.getObjectCount();
dropRequest.setCurrentSize(getQueueSize());
swapLocationItr.remove();
}