You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/12/27 17:52:25 UTC

nifi git commit: NIFI-3250 Fixing logic in StandardFlowFileQueue when migrating flow files to the active queue

Repository: nifi
Updated Branches:
  refs/heads/master af8ed8b7d -> 978f483ba


NIFI-3250 Fixing logic in StandardFlowFileQueue when migrating flow files to the active queue

This closes #3250.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/978f483b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/978f483b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/978f483b

Branch: refs/heads/master
Commit: 978f483ba84b323674fc0134660bed2ee30da079
Parents: af8ed8b
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Dec 22 15:48:45 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Dec 27 12:52:01 2016 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  |  3 +-
 .../controller/TestStandardFlowFileQueue.java   | 65 ++++++++++++++++++--
 2 files changed, 62 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/978f483b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index e20e250..ba01338 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -448,7 +448,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
         // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
         // to disk, because we want them to be swapped back in in the same order that they were swapped out.
 
-        if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+        final int activeQueueSize = activeQueue.size();
+        if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/978f483b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 3960d8d..fc10d00 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -74,6 +74,12 @@ public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
+    private Connection connection = null;
+    private FlowFileRepository flowFileRepo = null;
+    private ProvenanceEventRepository provRepo = null;
+    private ResourceClaimManager claimManager = null;
+    private ProcessScheduler scheduler = null;
+
     private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
 
     @BeforeClass
@@ -86,16 +92,16 @@ public class TestStandardFlowFileQueue {
     public void setup() {
         provRecords.clear();
 
-        final Connection connection = Mockito.mock(Connection.class);
+        connection = Mockito.mock(Connection.class);
         Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
         Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
 
-        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        scheduler = Mockito.mock(ProcessScheduler.class);
         swapManager = new TestSwapManager();
 
-        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
-        final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
-        final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+        flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        provRepo = Mockito.mock(ProvenanceEventRepository.class);
+        claimManager = Mockito.mock(ResourceClaimManager.class);
 
         Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
         Mockito.doAnswer(new Answer<Object>() {
@@ -383,6 +389,55 @@ public class TestStandardFlowFileQueue {
     }
 
     @Test
+    public void testSwapInWhenThresholdIsLessThanSwapSize() {
+        // create a queue where the swap threshold is less than 10k
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000);
+
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new TestFlowFile());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+
+        // At this point there should be:
+        // 1k flow files in the active queue
+        // 9,001 flow files in the swap queue
+        // 10k flow files swapped to disk
+
+        for (int i = 0; i < 999; i++) { //
+            final FlowFileRecord flowFile = queue.poll(exp);
+            assertNotNull(flowFile);
+            assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+
+            queue.acknowledge(Collections.singleton(flowFile));
+            assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp);
+    }
+
+    @Test
     public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
         for (int i = 1; i <= 20000; i++) {
             queue.put(new TestFlowFile());