You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/14 03:17:03 UTC

[nifi] branch master updated: NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal data structures: activeQueue, swapQueue. activeQueue is intended to be pulled from for processing. swapQueue is intended to hold FlowFiles that are waiting to be swapped out. SinWe want to ensure that we first swap in any data that has already been swapped out before processing the swap queue, in order to ensure that we process the data in the correct order. This fix ddresses an issue where data was being swapped ou [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 66d5ab8  NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal data structures: activeQueue, swapQueue. activeQueue is intended to be pulled from for processing. swapQueue is intended to hold FlowFiles that are waiting to be swapped out. SinWe want to ensure that we first swap in any data that has already been swapped out before processing the swap queue, in order to ensure that we process the data in the correct order. This fix ddresses an issue where data w [...]
66d5ab8 is described below

commit 66d5ab80eb22f535da0898ae0d6a4a5da2dd7bd9
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jan 10 11:27:57 2020 -0500

    NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal data structures: activeQueue, swapQueue. activeQueue is intended to be pulled from for processing. swapQueue is intended to hold FlowFiles that are waiting to be swapped out. SinWe want to ensure that we first swap in any data that has already been swapped out before processing the swap queue, in order to ensure that we process the data in the correct order. This fix ddresses an issue where data was being swapp [...]
    NIFI-7011: Addressed corner case where data could be inserted out of order still if added while swapping was taking place
    NIFI-7011: Fixed ordering issue with swap queue that can occur if data is migrated from swap queue to active queue instead of being swapped out
---
 .../controller/queue/SwappablePriorityQueue.java   |  54 ++---
 .../repository/StandardProcessSession.java         |   1 -
 .../nifi/controller/TestStandardFlowFileQueue.java |  22 --
 .../clustered/TestSwappablePriorityQueue.java      | 227 ++++++++++++++++++++-
 .../integration/swap/StandaloneSwapFileIT.java     |   4 +-
 5 files changed, 255 insertions(+), 53 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index b81bd3f..c442838 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -162,6 +162,9 @@ public class SwappablePriorityQueue {
         }
 
         migrateSwapToActive();
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
 
         final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
 
@@ -171,10 +174,11 @@ public class SwappablePriorityQueue {
             originalSwapQueueBytes += flowFile.getSize();
         }
 
-        // Create a new Priority queue with the prioritizers that are set, but reverse the
-        // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
-        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new QueuePrioritizer(getPriorities())));
-        tempQueue.addAll(activeQueue);
+        // Create a new Priority queue with the same prioritizers that are set for this queue. We want to swap out the highest priority data first, because
+        // whatever data we don't write out to a swap file (because there isn't enough to fill a swap file) will be added back to the swap queue.
+        // Since the swap queue cannot be processed until all swap files, we want to ensure that only the lowest priority data goes back onto it. Which means
+        // that we must swap out the highest priority data that is currently on the swap queue.
+        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
         tempQueue.addAll(swapQueue);
 
         long bytesSwappedOut = 0L;
@@ -221,23 +225,14 @@ public class SwappablePriorityQueue {
         // swap queue. Then add the records back to the active queue.
         swapQueue.clear();
         long updatedSwapQueueBytes = 0L;
-        while (tempQueue.size() > swapThreshold) {
-            final FlowFileRecord record = tempQueue.poll();
+        FlowFileRecord record;
+        while ((record = tempQueue.poll()) != null) {
             swapQueue.add(record);
             updatedSwapQueueBytes += record.getSize();
         }
 
         Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
 
-        // replace the contents of the active queue, since we've merged it with the swap queue.
-        activeQueue.clear();
-        FlowFileRecord toRequeue;
-        long activeQueueBytes = 0L;
-        while ((toRequeue = tempQueue.poll()) != null) {
-            activeQueue.offer(toRequeue);
-            activeQueueBytes += toRequeue.getSize();
-        }
-
         boolean updated = false;
         while (!updated) {
             final FlowFileQueueSize originalSize = getFlowFileQueueSize();
@@ -245,13 +240,13 @@ public class SwappablePriorityQueue {
             final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
             final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
 
-            final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(),
                 originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut,
                 originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut,
                 originalSize.getSwapFileCount() + numSwapFiles,
                 originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes());
-            updated = updateSize(originalSize, newSize);
 
+            updated = updateSize(originalSize, newSize);
             if (updated) {
                 logIfNegative(originalSize, newSize, "swap");
             }
@@ -286,9 +281,7 @@ public class SwappablePriorityQueue {
         // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
         // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
         // to disk, because we want them to be swapped back in in the same order that they were swapped out.
-
-        final int activeQueueSize = activeQueue.size();
-        if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+        if (!activeQueue.isEmpty()) {
             return;
         }
 
@@ -315,20 +308,33 @@ public class SwappablePriorityQueue {
             return;
         }
 
+        // Swap Queue is not currently ordered. We want to migrate the highest priority FlowFiles to the Active Queue, then re-queue the lowest priority items.
+        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
+        tempQueue.addAll(swapQueue);
+
         int recordsMigrated = 0;
         long bytesMigrated = 0L;
-        final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
-        while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
-            final FlowFileRecord toMigrate = swapItr.next();
+        while (activeQueue.size() < swapThreshold) {
+            final FlowFileRecord toMigrate = tempQueue.poll();
+            if (toMigrate == null) {
+                break;
+            }
+
             activeQueue.add(toMigrate);
             bytesMigrated += toMigrate.getSize();
             recordsMigrated++;
-            swapItr.remove();
+        }
+
+        swapQueue.clear();
+        FlowFileRecord toRequeue;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            swapQueue.add(toRequeue);
         }
 
         if (recordsMigrated > 0) {
             incrementActiveQueueSize(recordsMigrated, bytesMigrated);
             incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
+            logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", recordsMigrated, this);
         }
 
         if (size.getSwappedCount() == 0) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index da7a6ee..089ac90 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -56,7 +56,6 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.rocksdb.Checkpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index b454d2d..4cb28a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -315,28 +315,6 @@ public class TestStandardFlowFileQueue {
     }
 
     @Test
-    public void testLowestPrioritySwappedOutFirst() {
-        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-        prioritizers.add(new FlowFileSizePrioritizer());
-        queue.setPriorities(prioritizers);
-
-        long maxSize = 20000;
-        for (int i = 1; i <= 20000; i++) {
-            queue.put(new MockFlowFileRecord(maxSize - i));
-        }
-
-        assertEquals(1, swapManager.swapOutCalledCount);
-        assertEquals(20000, queue.size().getObjectCount());
-
-        assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
-        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
-        assertEquals(10000, flowFiles.size());
-        for (int i = 0; i < 10000; i++) {
-            assertEquals(i, flowFiles.get(i).getSize());
-        }
-    }
-
-    @Test
     public void testSwapIn() {
         for (int i = 1; i <= 20000; i++) {
             queue.put(new MockFlowFileRecord());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index ef1a063..79be6ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -26,9 +26,11 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -78,6 +80,193 @@ public class TestSwappablePriorityQueue {
         queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local");
     }
 
+    @Test
+    public void testPrioritizersBigQueue() {
+        final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+        final int iterations = 29000;
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i + iterations);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations)));
+
+            final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L);
+            assertEquals(polled.getAttribute("i"), String.valueOf(i));
+
+            queue.put(flowFile);
+        }
+
+        // Make sure that the data is pulled from the queue and added back a couple of times.
+        // This will trigger swapping to occur, but also leave a lot of data in memory on the queue.
+        // This specifically tests the edge case where data is swapped out, and we want to make sure that
+        // when we read from the queue, that we swap the data back in before processing anything on the
+        // pending 'swap queue' internally.
+        repopulateQueue();
+        repopulateQueue();
+
+        int i=iterations;
+        FlowFileRecord flowFile;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            i++;
+        }
+    }
+
+
+    @Test
+    public void testOrderingWithCornerCases() {
+        final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+
+        for (final int queueSize : new int[] {1, 9999, 10_000, 10_001, 19_999, 20_000, 20_001}) {
+            System.out.println("Queue Size: " + queueSize);
+
+            for (int i=0; i < queueSize; i++) {
+                final MockFlowFile flowFile = new MockFlowFile(i);
+                flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
+                queue.put(flowFile);
+            }
+
+            for (int i=0; i < queueSize; i++) {
+                final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
+                assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            }
+
+            assertNull(queue.poll(Collections.emptySet(), 0));
+        }
+    }
+
+    @Test
+    public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() {
+        final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+
+        // Add 10,000 FlowFiles to the queue. These will all go to the active queue.
+        final int iterations = 10000;
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        // Added 3 FlowFiles to the queue. These will all go to the Swap Queue.
+        for (final String iValue : new String[] {"10000", "-5", "8000"}) {
+            final MockFlowFile swapQueueFlowFile1 = new MockFlowFile(10_000);
+            swapQueueFlowFile1.putAttributes(Collections.singletonMap("i", iValue));
+            queue.put(swapQueueFlowFile1);
+        }
+
+        // The first 10,000 should be ordered. Then all FlowFiles on the swap queue should be transferred over, as a single unit, just as they would be in a swap file.
+        for (int i=0; i < iterations; i++) {
+            final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+        }
+
+        for (final String iValue : new String[] {"-5", "8000", "10000"}) {
+            final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
+            assertEquals(iValue, flowFile.getAttribute("i"));
+        }
+    }
+
+    @Test
+    public void testPrioritizersDataAddedAfterSwapOccurs() {
+        final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+        final int iterations = 29000;
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i + iterations);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations)));
+
+            final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L);
+            assertEquals(polled.getAttribute("i"), String.valueOf(i));
+
+            queue.put(flowFile);
+        }
+
+        // Make sure that the data is pulled from the queue and added back a couple of times.
+        // This will trigger swapping to occur, but also leave a lot of data in memory on the queue.
+        // This specifically tests the edge case where data is swapped out, and we want to make sure that
+        // when we read from the queue, that we swap the data back in before processing anything on the
+        // pending 'swap queue' internally.
+        repopulateQueue();
+        repopulateQueue();
+
+        // Add enough data for another swap file to get created.
+        final int baseI = iterations * 2;
+        for (int i=0; i < 10_000; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(baseI + i)));
+            queue.put(flowFile);
+        }
+
+        repopulateQueue();
+
+        int i=iterations;
+        FlowFileRecord flowFile;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            i++;
+        }
+    }
+
+    private void repopulateQueue() {
+        final List<String> attrs = new ArrayList<>();
+        final List<FlowFileRecord> ffs = new ArrayList<>();
+        FlowFileRecord ff;
+        while ((ff = queue.poll(Collections.emptySet(), 0L)) != null) {
+            ffs.add(ff);
+            attrs.add(ff.getAttribute("i"));
+        }
+
+        ffs.forEach(queue::put);
+        System.out.println(StringUtils.join(attrs, ", "));
+    }
+
 
     @Test
     public void testSwapOutFailureLeavesCorrectQueueSize() {
@@ -241,13 +430,45 @@ public class TestSwappablePriorityQueue {
         assertEquals(20000, queue.size().getObjectCount());
 
         assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
-        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>(), 500000);
-        assertEquals(10000, flowFiles.size());
+
+        // The first 10,000 FlowFiles to be added to the queue will be sorted by size (first 10,000 because that's the swap threshold, by size because of the prioritizer).
+        // The next 10,000 spill over to the swap queue. So we expect the first 10,000 FlowFiles to be size 10,000 to 20,000. Then the next 10,000 to be sized 0 to 9,999.
+        final List<FlowFileRecord> firstBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0);
+        assertEquals(10000, firstBatch.size());
         for (int i = 0; i < 10000; i++) {
-            assertEquals(i, flowFiles.get(i).getSize());
+            assertEquals(10_000 + i, firstBatch.get(i).getSize());
         }
+
+        final List<FlowFileRecord> secondBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0);
+        assertEquals(10000, secondBatch.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, secondBatch.get(i).getSize());
+        }
+
     }
 
+    @Test
+    public void testPrioritiesKeptIntactBeforeSwap() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize()));
+        queue.setPriorities(prioritizers);
+
+        int maxSize = 9999;
+        for (int i = 1; i <= maxSize; i++) {
+            queue.put(new MockFlowFileRecord(maxSize - i));
+        }
+
+        assertEquals(0, swapManager.swapOutCalledCount);
+        assertEquals(maxSize, queue.size().getObjectCount());
+
+        assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        FlowFileRecord flowFile;
+        int i=0;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0L)) != null) {
+            assertEquals(i++, flowFile.getSize());
+        }
+    }
 
     @Test
     public void testSwapIn() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
index c93bf7c..88470ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
@@ -35,10 +35,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
 public class StandaloneSwapFileIT extends FrameworkIntegrationTest {
-    @Test
+    @Test(timeout=60_000)
     public void testSwapOnRestart() throws ExecutionException, InterruptedException, IOException {
-        Thread.sleep(20000L);
-
         final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
         generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));