You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/12/27 17:52:25 UTC
nifi git commit: NIFI-3250 Fixing logic in StandardFlowFileQueue when
migrating flow files to the active queue
Repository: nifi
Updated Branches:
refs/heads/master af8ed8b7d -> 978f483ba
NIFI-3250 Fixing logic in StandardFlowFileQueue when migrating flow files to the active queue
This closes #3250.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/978f483b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/978f483b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/978f483b
Branch: refs/heads/master
Commit: 978f483ba84b323674fc0134660bed2ee30da079
Parents: af8ed8b
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Dec 22 15:48:45 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Dec 27 12:52:01 2016 -0500
----------------------------------------------------------------------
.../nifi/controller/StandardFlowFileQueue.java | 3 +-
.../controller/TestStandardFlowFileQueue.java | 65 ++++++++++++++++++--
2 files changed, 62 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/978f483b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index e20e250..ba01338 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -448,7 +448,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
// Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
// to disk, because we want them to be swapped back in in the same order that they were swapped out.
- if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+ final int activeQueueSize = activeQueue.size();
+ if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
return;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/978f483b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 3960d8d..fc10d00 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -74,6 +74,12 @@ public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
+ private Connection connection = null;
+ private FlowFileRepository flowFileRepo = null;
+ private ProvenanceEventRepository provRepo = null;
+ private ResourceClaimManager claimManager = null;
+ private ProcessScheduler scheduler = null;
+
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
@BeforeClass
@@ -86,16 +92,16 @@ public class TestStandardFlowFileQueue {
public void setup() {
provRecords.clear();
- final Connection connection = Mockito.mock(Connection.class);
+ connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
- final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ scheduler = Mockito.mock(ProcessScheduler.class);
swapManager = new TestSwapManager();
- final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
- final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
- final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+ flowFileRepo = Mockito.mock(FlowFileRepository.class);
+ provRepo = Mockito.mock(ProvenanceEventRepository.class);
+ claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
Mockito.doAnswer(new Answer<Object>() {
@@ -383,6 +389,55 @@ public class TestStandardFlowFileQueue {
}
@Test
+ public void testSwapInWhenThresholdIsLessThanSwapSize() {
+ // create a queue where the swap threshold is less than 10k
+ queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000);
+
+ for (int i = 1; i <= 20000; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ assertEquals(1, swapManager.swappedOut.size());
+ queue.put(new TestFlowFile());
+ assertEquals(1, swapManager.swappedOut.size());
+
+ final Set<FlowFileRecord> exp = new HashSet<>();
+
+ // At this point there should be:
+ // 1k flow files in the active queue
+ // 9,001 flow files in the swap queue
+ // 10k flow files swapped to disk
+
+ for (int i = 0; i < 999; i++) { //
+ final FlowFileRecord flowFile = queue.poll(exp);
+ assertNotNull(flowFile);
+ assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+
+ queue.acknowledge(Collections.singleton(flowFile));
+ assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+ }
+
+ assertEquals(0, swapManager.swapInCalledCount);
+ assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+ assertNotNull(queue.poll(exp));
+
+ assertEquals(0, swapManager.swapInCalledCount);
+ assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+ assertEquals(1, swapManager.swapOutCalledCount);
+
+ assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+ assertEquals(1, swapManager.swapInCalledCount);
+ assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+ assertTrue(swapManager.swappedOut.isEmpty());
+
+ queue.poll(exp);
+ }
+
+ @Test
public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
for (int i = 1; i <= 20000; i++) {
queue.put(new TestFlowFile());