You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/05 18:12:20 UTC
nifi git commit: NIFI-1110: Fixed bug that caused queue size to
become negative when FlowFiles are expired
Repository: nifi
Updated Branches:
refs/heads/master 4ac52bfc0 -> 98f5a1ab7
NIFI-1110: Fixed bug that caused queue size to become negative when FlowFiles are expired
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/98f5a1ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/98f5a1ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/98f5a1ab
Branch: refs/heads/master
Commit: 98f5a1ab779eb3d49f95eda36844d1398af1a1f6
Parents: 4ac52bf
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 4 14:24:08 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Nov 5 12:12:02 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/StandardFlowFileQueue.java | 41 ++++++++++++++++----
.../controller/TestStandardFlowFileQueue.java | 29 ++++++++++++++
2 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 3986ca8..0b3c661 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -395,7 +395,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
- int expiredRecordCount = 0;
long expiredBytes = 0L;
do {
@@ -404,8 +403,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
if (isExpired) {
expiredRecords.add(flowFile);
- expiredRecordCount++;
expiredBytes += flowFile.getSize();
+ flowFile = null;
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
break;
@@ -419,12 +418,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (flowFile != null) {
incrementActiveQueueSize(-1, -flowFile.getSize());
}
-
- if (expiredRecordCount > 0) {
- incrementActiveQueueSize(-expiredRecordCount, -expiredBytes);
- }
} while (isExpired);
+ if (!expiredRecords.isEmpty()) {
+ incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
+ }
+
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
@@ -432,7 +431,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
queueFullRef.set(determineIfFull());
}
- return isExpired ? null : flowFile;
+ return flowFile;
}
@Override
@@ -1198,6 +1197,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes,
original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes);
updated = size.compareAndSet(original, newSize);
+
+ if (updated) {
+ logIfNegative(original, newSize, "active");
+ }
}
}
@@ -1208,6 +1211,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes);
updated = size.compareAndSet(original, newSize);
+
+ if (updated) {
+ logIfNegative(original, newSize, "swap");
+ }
}
}
@@ -1218,6 +1225,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
updated = size.compareAndSet(original, newSize);
+
+ if (updated) {
+ logIfNegative(original, newSize, "Unacknowledged");
+ }
+ }
+ }
+
+ private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) {
+ if (newSize.activeQueueBytes < 0 || newSize.activeQueueCount < 0 || newSize.swappedBytes < 0 || newSize.swappedCount < 0 ||
+ newSize.unacknowledgedBytes < 0 || newSize.unacknowledgedCount < 0) {
+
+ logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size"));
+
}
}
@@ -1259,5 +1279,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
public QueueSize swapQueueSize() {
return new QueueSize(swappedCount, swappedBytes);
}
+
+ @Override
+ public String toString() {
+ return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes +
+ " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
+ " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 7ef5fc8..8b8c678 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -76,6 +77,34 @@ public class TestStandardFlowFileQueue {
TestFlowFile.idGenerator.set(0L);
}
+ @Test
+ public void testExpire() {
+ queue.setFlowFileExpiration("1 ms");
+
+ for (int i = 0; i < 100; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ // just make sure that the flowfiles have time to expire.
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {
+ }
+
+ final Set<FlowFileRecord> expiredRecords = new HashSet<>(100);
+ final FlowFileRecord pulled = queue.poll(expiredRecords);
+
+ assertNull(pulled);
+ assertEquals(100, expiredRecords.size());
+
+ final QueueSize activeSize = queue.getActiveQueueSize();
+ assertEquals(0, activeSize.getObjectCount());
+ assertEquals(0L, activeSize.getByteCount());
+
+ final QueueSize unackSize = queue.getUnacknowledgedQueueSize();
+ assertEquals(0, unackSize.getObjectCount());
+ assertEquals(0L, unackSize.getByteCount());
+ }
@Test
public void testSwapOutOccurs() {