You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:00 UTC

[04/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 303ca7b..b454d2d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -17,47 +17,20 @@
 
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.IncompleteSwapFileException;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.swap.StandardSwapContents;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -71,8 +44,22 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class TestStandardFlowFileQueue {
-    private TestSwapManager swapManager = null;
+    private MockSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
     private Connection connection = null;
@@ -98,7 +85,7 @@ public class TestStandardFlowFileQueue {
         Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
 
         scheduler = Mockito.mock(ProcessScheduler.class);
-        swapManager = new TestSwapManager();
+        swapManager = new MockSwapManager();
 
         flowFileRepo = Mockito.mock(FlowFileRepository.class);
         provRepo = Mockito.mock(ProvenanceEventRepository.class);
@@ -116,8 +103,8 @@ public class TestStandardFlowFileQueue {
             }
         }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
 
-        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
-        TestFlowFile.idGenerator.set(0L);
+        queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
+        MockFlowFileRecord.resetIdGenerator();
     }
 
     @Test
@@ -125,7 +112,7 @@ public class TestStandardFlowFileQueue {
         queue.setFlowFileExpiration("1 ms");
 
         for (int i = 0; i < 100; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         // just make sure that the flowfiles have time to expire.
@@ -140,11 +127,11 @@ public class TestStandardFlowFileQueue {
         assertNull(pulled);
         assertEquals(100, expiredRecords.size());
 
-        final QueueSize activeSize = queue.getActiveQueueSize();
+        final QueueSize activeSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize();
         assertEquals(0, activeSize.getObjectCount());
         assertEquals(0L, activeSize.getByteCount());
 
-        final QueueSize unackSize = queue.getUnacknowledgedQueueSize();
+        final QueueSize unackSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize();
         assertEquals(0, unackSize.getObjectCount());
         assertEquals(0L, unackSize.getByteCount());
     }
@@ -158,13 +145,13 @@ public class TestStandardFlowFileQueue {
         assertFalse(queue.isFull());
 
         for (int i = 0; i < 9; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertFalse(queue.isFull());
             assertFalse(queue.isEmpty());
             assertFalse(queue.isActiveQueueEmpty());
         }
 
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertTrue(queue.isFull());
         assertFalse(queue.isEmpty());
         assertFalse(queue.isActiveQueueEmpty());
@@ -193,11 +180,11 @@ public class TestStandardFlowFileQueue {
         queue.setFlowFileExpiration("10 millis");
 
         for (int i = 0; i < 9; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertFalse(queue.isFull());
         }
 
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertTrue(queue.isFull());
 
         Thread.sleep(100L);
@@ -226,11 +213,11 @@ public class TestStandardFlowFileQueue {
         queue.setFlowFileExpiration("10 millis");
 
         for (int i = 0; i < 9; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertFalse(queue.isFull());
         }
 
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertTrue(queue.isFull());
 
         Thread.sleep(100L);
@@ -259,11 +246,11 @@ public class TestStandardFlowFileQueue {
         queue.setFlowFileExpiration("10 millis");
 
         for (int i = 0; i < 9; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertFalse(queue.isFull());
         }
 
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertTrue(queue.isFull());
 
         Thread.sleep(100L);
@@ -284,11 +271,11 @@ public class TestStandardFlowFileQueue {
         queue.setFlowFileExpiration("10 millis");
 
         for (int i = 0; i < 9; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertFalse(queue.isFull());
         }
 
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertTrue(queue.isFull());
 
         Thread.sleep(100L);
@@ -306,25 +293,25 @@ public class TestStandardFlowFileQueue {
     @Test
     public void testSwapOutOccurs() {
         for (int i = 0; i < 10000; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertEquals(0, swapManager.swapOutCalledCount);
             assertEquals(i + 1, queue.size().getObjectCount());
             assertEquals(i + 1, queue.size().getByteCount());
         }
 
         for (int i = 0; i < 9999; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
             assertEquals(0, swapManager.swapOutCalledCount);
             assertEquals(i + 10001, queue.size().getObjectCount());
             assertEquals(i + 10001, queue.size().getByteCount());
         }
 
-        queue.put(new TestFlowFile(1000));
+        queue.put(new MockFlowFileRecord(1000));
         assertEquals(1, swapManager.swapOutCalledCount);
         assertEquals(20000, queue.size().getObjectCount());
         assertEquals(20999, queue.size().getByteCount());
 
-        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
     }
 
     @Test
@@ -335,13 +322,13 @@ public class TestStandardFlowFileQueue {
 
         long maxSize = 20000;
         for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile(maxSize - i));
+            queue.put(new MockFlowFileRecord(maxSize - i));
         }
 
         assertEquals(1, swapManager.swapOutCalledCount);
         assertEquals(20000, queue.size().getObjectCount());
 
-        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
         final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
         assertEquals(10000, flowFiles.size());
         for (int i = 0; i < 10000; i++) {
@@ -352,37 +339,37 @@ public class TestStandardFlowFileQueue {
     @Test
     public void testSwapIn() {
         for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         assertEquals(1, swapManager.swappedOut.size());
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertEquals(1, swapManager.swappedOut.size());
 
         final Set<FlowFileRecord> exp = new HashSet<>();
         for (int i = 0; i < 9999; i++) {
             final FlowFileRecord flowFile = queue.poll(exp);
             assertNotNull(flowFile);
-            assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
-            assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+            assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
 
             queue.acknowledge(Collections.singleton(flowFile));
-            assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
-            assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+            assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
         }
 
         assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
         assertNotNull(queue.poll(exp));
 
         assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
 
         assertEquals(1, swapManager.swapOutCalledCount);
 
         assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
         assertEquals(1, swapManager.swapInCalledCount);
-        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
 
         assertTrue(swapManager.swappedOut.isEmpty());
 
@@ -392,14 +379,14 @@ public class TestStandardFlowFileQueue {
     @Test
     public void testSwapInWhenThresholdIsLessThanSwapSize() {
         // create a queue where the swap threshold is less than 10k
-        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
+        queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
 
         for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         assertEquals(1, swapManager.swappedOut.size());
-        queue.put(new TestFlowFile());
+        queue.put(new MockFlowFileRecord());
         assertEquals(1, swapManager.swappedOut.size());
 
         final Set<FlowFileRecord> exp = new HashSet<>();
@@ -412,26 +399,26 @@ public class TestStandardFlowFileQueue {
         for (int i = 0; i < 999; i++) { //
             final FlowFileRecord flowFile = queue.poll(exp);
             assertNotNull(flowFile);
-            assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
-            assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+            assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
 
             queue.acknowledge(Collections.singleton(flowFile));
-            assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
-            assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+            assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
         }
 
         assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
         assertNotNull(queue.poll(exp));
 
         assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
 
         assertEquals(1, swapManager.swapOutCalledCount);
 
         assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
         assertEquals(1, swapManager.swapInCalledCount);
-        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
 
         assertTrue(swapManager.swappedOut.isEmpty());
 
@@ -441,7 +428,7 @@ public class TestStandardFlowFileQueue {
     @Test
     public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
         for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         assertEquals(20000, queue.size().getObjectCount());
@@ -502,7 +489,7 @@ public class TestStandardFlowFileQueue {
     @Test(timeout = 120000)
     public void testDropSwappedFlowFiles() {
         for (int i = 1; i <= 30000; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         assertEquals(2, swapManager.swappedOut.size());
@@ -524,7 +511,7 @@ public class TestStandardFlowFileQueue {
     @Test(timeout = 5000)
     public void testListFlowFilesOnlyActiveQueue() throws InterruptedException {
         for (int i = 0; i < 9999; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000);
@@ -544,7 +531,7 @@ public class TestStandardFlowFileQueue {
     @Test(timeout = 5000)
     public void testListFlowFilesResultsLimited() throws InterruptedException {
         for (int i = 0; i < 30050; i++) {
-            queue.put(new TestFlowFile());
+            queue.put(new MockFlowFileRecord());
         }
 
         final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
@@ -565,7 +552,7 @@ public class TestStandardFlowFileQueue {
         Collection<FlowFileRecord> tff = new ArrayList<>();
         //Swap Size is 10000 records, so 30000 is equal to 3 swap files.
         for (int i = 0; i < 30000; i++) {
-            tff.add(new TestFlowFile());
+            tff.add(new MockFlowFileRecord());
         }
 
         queue.putAll(tff);
@@ -588,7 +575,7 @@ public class TestStandardFlowFileQueue {
     public void testOOMEFollowedBySuccessfulSwapIn() {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < 50000; i++) {
-            flowFiles.add(new TestFlowFile());
+            flowFiles.add(new MockFlowFileRecord());
         }
 
         queue.putAll(flowFiles);
@@ -633,224 +620,13 @@ public class TestStandardFlowFileQueue {
 
         queue.acknowledge(flowFiles);
         assertNull(queue.poll(expiredRecords));
-        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+        assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
         assertEquals(0, queue.size().getObjectCount());
 
         assertTrue(swapManager.swappedOut.isEmpty());
     }
 
 
-    private class TestSwapManager implements FlowFileSwapManager {
-        private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
-        int swapOutCalledCount = 0;
-        int swapInCalledCount = 0;
-
-        private int incompleteSwapFileRecordsToInclude = -1;
-
-        private int failSwapInAfterN = -1;
-        private Throwable failSwapInFailure = null;
-
-        private void setSwapInFailure(final Throwable t) {
-            this.failSwapInFailure = t;
-        }
-
-        @Override
-        public void initialize(final SwapManagerInitializationContext initializationContext) {
-
-        }
-
-        public void enableIncompleteSwapFileException(final int flowFilesToInclude) {
-            incompleteSwapFileRecordsToInclude = flowFilesToInclude;
-        }
-
-        @Override
-        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
-            swapOutCalledCount++;
-            final String location = UUID.randomUUID().toString();
-            swappedOut.put(location, new ArrayList<>(flowFiles));
-            return location;
-        }
-
-        private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException {
-            if (incompleteSwapFileRecordsToInclude > -1) {
-                final SwapSummary summary = getSwapSummary(swapLocation);
-
-                final List<FlowFileRecord> records;
-                if (remove) {
-                    records = swappedOut.remove(swapLocation);
-                } else {
-                    records = swappedOut.get(swapLocation);
-                }
-
-                final List<FlowFileRecord> partial = records.subList(0, incompleteSwapFileRecordsToInclude);
-                final SwapContents partialContents = new StandardSwapContents(summary, partial);
-                throw new IncompleteSwapFileException(swapLocation, partialContents);
-            }
-
-            if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) {
-                if (failSwapInFailure instanceof RuntimeException) {
-                    throw (RuntimeException) failSwapInFailure;
-                }
-                if (failSwapInFailure instanceof Error) {
-                    throw (Error) failSwapInFailure;
-                }
-
-                throw new RuntimeException(failSwapInFailure);
-            }
-        }
-
-        @Override
-        public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
-            throwIncompleteIfNecessary(swapLocation, false);
-            return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation));
-        }
-
-        @Override
-        public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
-            swapInCalledCount++;
-            throwIncompleteIfNecessary(swapLocation, true);
-            return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation));
-        }
-
-        @Override
-        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
-            return new ArrayList<>(swappedOut.keySet());
-        }
-
-        @Override
-        public SwapSummary getSwapSummary(String swapLocation) throws IOException {
-            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
-            if (flowFiles == null) {
-                return StandardSwapSummary.EMPTY_SUMMARY;
-            }
-
-            int count = 0;
-            long size = 0L;
-            Long max = null;
-            final List<ResourceClaim> resourceClaims = new ArrayList<>();
-            for (final FlowFileRecord flowFile : flowFiles) {
-                count++;
-                size += flowFile.getSize();
-                if (max == null || flowFile.getId() > max) {
-                    max = flowFile.getId();
-                }
-
-                if (flowFile.getContentClaim() != null) {
-                    resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
-                }
-            }
-
-            return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
-        }
-
-        @Override
-        public void purge() {
-            swappedOut.clear();
-        }
-    }
-
-
-    private static class TestFlowFile implements FlowFileRecord {
-        private static final AtomicLong idGenerator = new AtomicLong(0L);
-
-        private final long id = idGenerator.getAndIncrement();
-        private final long entryDate = System.currentTimeMillis();
-        private final Map<String, String> attributes;
-        private final long size;
-
-        public TestFlowFile() {
-            this(1L);
-        }
-
-        public TestFlowFile(final long size) {
-            this(new HashMap<>(), size);
-        }
-
-        public TestFlowFile(final Map<String, String> attributes, final long size) {
-            this.attributes = attributes;
-            this.size = size;
-
-            if (!attributes.containsKey(CoreAttributes.UUID.key())) {
-                attributes.put(CoreAttributes.UUID.key(), createFakeUUID());
-            }
-        }
-
-        private  String createFakeUUID(){
-            final String s=Long.toHexString(id);
-            return new StringBuffer("00000000-0000-0000-0000000000000000".substring(0,(35-s.length()))+s).insert(23, '-').toString();
-        }
-
-        @Override
-        public long getId() {
-            return id;
-        }
-
-        @Override
-        public long getEntryDate() {
-            return entryDate;
-        }
-
-        @Override
-        public long getLineageStartDate() {
-            return entryDate;
-        }
-
-        @Override
-        public Long getLastQueueDate() {
-            return null;
-        }
-
-        @Override
-        public boolean isPenalized() {
-            return false;
-        }
-
-        @Override
-        public String getAttribute(String key) {
-            return attributes.get(key);
-        }
-
-        @Override
-        public long getSize() {
-            return size;
-        }
-
-        @Override
-        public Map<String, String> getAttributes() {
-            return Collections.unmodifiableMap(attributes);
-        }
-
-        @Override
-        public int compareTo(final FlowFile o) {
-            return Long.compare(id, o.getId());
-        }
-
-        @Override
-        public long getPenaltyExpirationMillis() {
-            return 0;
-        }
-
-        @Override
-        public ContentClaim getContentClaim() {
-            return null;
-        }
-
-        @Override
-        public long getContentClaimOffset() {
-            return 0;
-        }
-
-        @Override
-        public long getLineageStartIndex() {
-            return 0;
-        }
-
-        @Override
-        public long getQueueDateIndex() {
-            return 0;
-        }
-    }
-
     private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
         @Override
         public int compare(final FlowFile o1, final FlowFile o2) {