You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/09/14 19:55:38 UTC

nifi git commit: NIFI-2754 - Migrating swap to active prior to swapping if necessary. - This closes #1000.

Repository: nifi
Updated Branches:
  refs/heads/master 67a47dbea -> 8a28395e9


NIFI-2754
- Migrating swap to active prior to swapping if necessary.
- This closes #1000.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a28395e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a28395e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a28395e

Branch: refs/heads/master
Commit: 8a28395e9feafdb3af8c76137bfe0f5f7a07e27e
Parents: 67a47db
Author: Peter Wicks <PW...@MICRON.COM>
Authored: Fri Sep 9 22:10:01 2016 -0600
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Sep 14 14:27:50 2016 -0400

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  |  5 +++-
 .../controller/TestStandardFlowFileQueue.java   | 24 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8a28395e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 77f82d5..68af208 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -389,7 +389,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
             if (flowFile != null) {
                 incrementActiveQueueSize(-1, -flowFile.getSize());
             }
-        } while (isExpired);
+        }
+        while (isExpired);
 
         if (!expiredRecords.isEmpty()) {
             incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
@@ -547,6 +548,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
             return;
         }
 
+        migrateSwapToActive();
+
         final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
 
         int originalSwapQueueCount = swapQueue.size();

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a28395e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 32c1dc6..3960d8d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -503,6 +504,29 @@ public class TestStandardFlowFileQueue {
         assertNull(status.getFailureReason());
     }
 
+    @Test(timeout = 5000)
+    public void testListFlowFilesResultsLimitedCollection() throws InterruptedException {
+        Collection<FlowFileRecord> tff = new ArrayList<>();
+        //Swap Size is 10000 records, so 30000 is equal to 3 swap files.
+        for (int i = 0; i < 30000; i++) {
+            tff.add(new TestFlowFile());
+        }
+
+        queue.putAll(tff);
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
+        assertNotNull(status);
+        assertEquals(30000, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(100, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+    }
+
 
     private class TestSwapManager implements FlowFileSwapManager {
         private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();