You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/05 18:12:20 UTC

nifi git commit: NIFI-1110: Fixed bug that caused queue size to become negative when FlowFiles are expired

Repository: nifi
Updated Branches:
  refs/heads/master 4ac52bfc0 -> 98f5a1ab7


NIFI-1110: Fixed bug that caused queue size to become negative when FlowFiles are expired

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/98f5a1ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/98f5a1ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/98f5a1ab

Branch: refs/heads/master
Commit: 98f5a1ab779eb3d49f95eda36844d1398af1a1f6
Parents: 4ac52bf
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 4 14:24:08 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Nov 5 12:12:02 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  | 41 ++++++++++++++++----
 .../controller/TestStandardFlowFileQueue.java   | 29 ++++++++++++++
 2 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 3986ca8..0b3c661 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -395,7 +395,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         migrateSwapToActive();
         final boolean queueFullAtStart = queueFullRef.get();
 
-        int expiredRecordCount = 0;
         long expiredBytes = 0L;
 
         do {
@@ -404,8 +403,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
             if (isExpired) {
                 expiredRecords.add(flowFile);
-                expiredRecordCount++;
                 expiredBytes += flowFile.getSize();
+                flowFile = null;
 
                 if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
                     break;
@@ -419,12 +418,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             if (flowFile != null) {
                 incrementActiveQueueSize(-1, -flowFile.getSize());
             }
-
-            if (expiredRecordCount > 0) {
-                incrementActiveQueueSize(-expiredRecordCount, -expiredBytes);
-            }
         } while (isExpired);
 
+        if (!expiredRecords.isEmpty()) {
+            incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
+        }
+
         // if at least 1 FlowFile was expired & the queue was full before we started, then
         // we need to determine whether or not the queue is full again. If no FlowFile was expired,
         // then the queue will still be full until the appropriate #acknowledge method is called.
@@ -432,7 +431,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             queueFullRef.set(determineIfFull());
         }
 
-        return isExpired ? null : flowFile;
+        return flowFile;
     }
 
     @Override
@@ -1198,6 +1197,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes,
                 original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
+
+            if (updated) {
+                logIfNegative(original, newSize, "active");
+            }
         }
     }
 
@@ -1208,6 +1211,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
                 original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
+
+            if (updated) {
+                logIfNegative(original, newSize, "swap");
+            }
         }
     }
 
@@ -1218,6 +1225,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
                 original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
             updated = size.compareAndSet(original, newSize);
+
+            if (updated) {
+                logIfNegative(original, newSize, "Unacknowledged");
+            }
+        }
+    }
+
+    private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) {
+        if (newSize.activeQueueBytes < 0 || newSize.activeQueueCount < 0 || newSize.swappedBytes < 0 || newSize.swappedCount < 0 ||
+            newSize.unacknowledgedBytes < 0 || newSize.unacknowledgedCount < 0) {
+
+            logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size"));
+
         }
     }
 
@@ -1259,5 +1279,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         public QueueSize swapQueueSize() {
             return new QueueSize(swappedCount, swappedBytes);
         }
+
+        @Override
+        public String toString() {
+            return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes +
+                " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
+                " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 7ef5fc8..8b8c678 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -76,6 +77,34 @@ public class TestStandardFlowFileQueue {
         TestFlowFile.idGenerator.set(0L);
     }
 
+    @Test
+    public void testExpire() {
+        queue.setFlowFileExpiration("1 ms");
+
+        for (int i = 0; i < 100; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        // just make sure that the flowfiles have time to expire.
+        try {
+            Thread.sleep(100L);
+        } catch (final InterruptedException ie) {
+        }
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>(100);
+        final FlowFileRecord pulled = queue.poll(expiredRecords);
+
+        assertNull(pulled);
+        assertEquals(100, expiredRecords.size());
+
+        final QueueSize activeSize = queue.getActiveQueueSize();
+        assertEquals(0, activeSize.getObjectCount());
+        assertEquals(0L, activeSize.getByteCount());
+
+        final QueueSize unackSize = queue.getUnacknowledgedQueueSize();
+        assertEquals(0, unackSize.getObjectCount());
+        assertEquals(0L, unackSize.getByteCount());
+    }
 
     @Test
     public void testSwapOutOccurs() {