You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/02/04 19:29:49 UTC

[nifi] branch master updated: NIFI-5997: If we swap out data, ensure that we do not increment the size of the queue by the size of the data that we failed to swap out. Also, if the FlowFile Repo does not know about a given swap file, do not restore it on restart

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ac191  NIFI-5997: If we swap out data, ensure that we do not increment the size of the queue by the size of the data that we failed to swap out. Also, if the FlowFile Repo does not know about a given swap file, do not restore it on restart
83ac191 is described below

commit 83ac191736e8036f82da467ceb1940b50d9886f0
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 1 13:10:51 2019 -0500

    NIFI-5997: If we swap out data, ensure that we do not increment the size of the queue by the size of the data that we failed to swap out. Also, if the FlowFile Repo does not know about a given swap file, do not restore it on restart
    
    This closes #3290.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../exception/FlowFileAccessException.java         |   2 +-
 .../java/org/apache/nifi/wali/HashMapSnapshot.java |  20 ++--
 .../nifi/wali/SequentialAccessWriteAheadLog.java   |  20 +++-
 .../org/apache/nifi/wali/WriteAheadSnapshot.java   |   3 +
 .../main/java/org/wali/WriteAheadRepository.java   |   2 +-
 .../controller/repository/FlowFileRepository.java  |  26 ++++-
 .../nifi-framework/nifi-framework-core/pom.xml     |   1 +
 .../nifi/controller/FileSystemSwapManager.java     |  20 +++-
 .../controller/queue/SwappablePriorityQueue.java   |   8 +-
 .../repository/StandardProcessSession.java         |  30 +++++-
 .../repository/VolatileFlowFileRepository.java     |  14 ++-
 .../repository/WriteAheadFlowFileRepository.java   |  88 +++++++++++++---
 .../apache/nifi/controller/MockSwapManager.java    |  38 +++++--
 .../nifi/controller/TestFileSystemSwapManager.java | 114 ++++++++++++++++++---
 .../clustered/TestSwappablePriorityQueue.java      |  59 ++++++++---
 .../repository/TestStandardProcessSession.java     | 113 +++++++++++++-------
 .../TestWriteAheadFlowFileRepository.java          |  96 +++++++++++++++++
 .../src/test/resources/swap/444-old-swap-file.swap | Bin 0 -> 1730054 bytes
 18 files changed, 536 insertions(+), 118 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
index c7e9c22..c64ea1c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processor.exception;
 
 /**
  * Indicates an issue occurred while accessing the content of a FlowFile, such
- * as an IOException.
+ * as an IOException,  or obtaining a reference to the FlowFile
  *
  */
 public class FlowFileAccessException extends RuntimeException {
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
index 0dad62c..002ecd2 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
@@ -17,6 +17,12 @@
 
 package org.apache.nifi.wali;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -37,12 +43,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
 public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> {
     private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class);
     private static final int ENCODING_VERSION = 1;
@@ -216,10 +216,14 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
         return recordMap.get(recordId);
     }
 
-
     @Override
     public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
-        return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId);
+        return prepareSnapshot(maxTransactionId, this.swapLocations);
+    }
+
+    @Override
+    public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId, final Set<String> swapFileLocations) {
+        return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapFileLocations), maxTransactionId);
     }
 
     private int getVersion() {
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index 11eb31c..240a212 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,6 +66,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
     private final File journalsDirectory;
     private final SerDeFactory<T> serdeFactory;
     private final SyncListener syncListener;
+    private final Set<String> recoveredSwapLocations = new HashSet<>();
 
     private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
     private final Lock journalReadLock = journalRWLock.readLock();
@@ -144,6 +146,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
         final long recoverStart = System.nanoTime();
         recovered = true;
         snapshotRecovery = snapshot.recover();
+        this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations());
 
         final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
 
@@ -212,7 +215,9 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
         final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
         logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis);
 
-        checkpoint();
+        this.recoveredSwapLocations.addAll(swapLocations);
+
+        checkpoint(this.recoveredSwapLocations);
 
         return recoveredRecords.values();
     }
@@ -238,11 +243,15 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
             throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed");
         }
 
-        return snapshotRecovery.getRecoveredSwapLocations();
+        return Collections.unmodifiableSet(this.recoveredSwapLocations);
     }
 
     @Override
     public int checkpoint() throws IOException {
+        return checkpoint(null);
+    }
+
+    private int checkpoint(final Set<String> swapLocations) throws IOException {
         final SnapshotCapture<T> snapshotCapture;
 
         final long startNanos = System.nanoTime();
@@ -276,7 +285,12 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
             final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile);
             existingJournals = (existingFiles == null) ? new File[0] : existingFiles;
 
-            snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
+            if (swapLocations == null) {
+                snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
+            } else {
+                snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1, swapLocations);
+            }
+
 
             // Create a new journal. We name the journal file <next transaction id>.journal but it is possible
             // that we could have an empty journal file already created. If this happens, we don't want to create
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
index a4cbcd2..fd7cfd8 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
@@ -19,10 +19,13 @@ package org.apache.nifi.wali;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
 
 public interface WriteAheadSnapshot<T> {
     SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
 
+    SnapshotCapture<T> prepareSnapshot(long maxTransactionId, Set<String> swapLocations);
+
     void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
 
     SnapshotRecovery<T> recover() throws IOException;
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
index 05fc8a5..b7f18ab 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
@@ -65,7 +65,7 @@ public interface WriteAheadRepository<T> {
      * if power is lost or the Operating System crashes
      * @throws IOException if failure to update repo
      * @throws IllegalArgumentException if multiple records within the given
-     * Collection have the same ID, as specified by {@link Record#getId()}
+     * Collection have the same ID, as specified by {@link SerDe#getRecordIdentifier(Object)}
      * method
      *
      * @return the index of the Partition that performed the update
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 6560c0a..b9ff249 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
 /**
  * Implementations must be thread safe
  *
@@ -128,4 +128,24 @@ public interface FlowFileRepository extends Closeable {
      * @throws IOException if swap fails
      */
     void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * <p>
+     * Determines whether or not the given swap location suffix is a valid, known location according to this FlowFileRepository. Note that while
+     * the {@link #swapFlowFilesIn(String, List, FlowFileQueue)} and {@link #swapFlowFilesOut(List, FlowFileQueue, String)} methods expect
+     * a full "swap location" this method expects only the "suffix" of a swap location. For example, if the location points to a file, this method
+     * would expect only the filename, not the full path.
+     * </p>
+     *
+     * <p>
+     * This method differs from the others because the other methods want to store the swap location or recover from a given location. However,
+     * this method is used to verify that the location is known. If for any reason, NiFi is stopped, its FlowFile Repository relocated to a new
+     * location (for example, a different disk partition), and restarted, the swap location would not match if we used the full location. Therefore,
+     * by using only the "suffix" (i.e. the filename for a file-based implementation), we can avoid worrying about relocation.
+     * </p>
+     *
+     * @param swapLocationSuffix the suffix of the location to check
+     * @return <code>true</code> if the swap location is known and valid, <code>false</code> otherwise
+     */
+    boolean isValidSwapLocationSuffix(String swapLocationSuffix);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 6bb1ea8..4b9af46 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -239,6 +239,7 @@
                         <exclude>src/test/resources/bye.txt</exclude>
                         <exclude>src/test/resources/old-swap-file.swap</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
+                        <exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 5f8f925..b2717c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -27,6 +28,8 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
 import org.apache.nifi.controller.swap.SchemaSwapSerializer;
 import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.StandardSwapContents;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.controller.swap.SwapDeserializer;
 import org.apache.nifi.controller.swap.SwapSerializer;
 import org.apache.nifi.events.EventReporter;
@@ -95,14 +98,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     public FileSystemSwapManager(final NiFiProperties nifiProperties) {
-        final Path flowFileRepoPath = nifiProperties.getFlowFileRepositoryPath();
+        this(nifiProperties.getFlowFileRepositoryPath());
+    }
 
+    public FileSystemSwapManager(final Path flowFileRepoPath) {
         this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
         if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
             throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
         }
     }
 
+
     @Override
     public synchronized void initialize(final SwapManagerInitializationContext initializationContext) {
         this.claimManager = initializationContext.getResourceClaimManager();
@@ -152,6 +158,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     @Override
     public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
         final File swapFile = new File(swapLocation);
+
+        final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
+        if (!validLocation) {
+            warn("Cannot swap in FlowFiles from location " + swapLocation + " because the FlowFile Repository does not know about this Swap Location. " +
+                "This file should be manually removed. This typically occurs when a Swap File is written but the FlowFile Repository is not updated yet to reflect this. " +
+                "This is generally not a cause for concern, but may be indicative of a failure to update the FlowFile Repository.");
+            final SwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0), 0L, Collections.emptyList());
+            return new StandardSwapContents(swapSummary, Collections.emptyList());
+        }
+
         final SwapContents swapContents = peek(swapLocation, flowFileQueue);
         flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue);
 
@@ -311,7 +327,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             }
         }
 
-        Collections.sort(swapLocations, new SwapFileComparator());
+        swapLocations.sort(new SwapFileComparator());
         return swapLocations;
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 058c714..df19f44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -180,19 +180,23 @@ public class SwappablePriorityQueue {
         int flowFilesSwappedOut = 0;
         final List<String> swapLocations = new ArrayList<>(numSwapFiles);
         for (int i = 0; i < numSwapFiles; i++) {
+            long bytesSwappedThisIteration = 0L;
+
             // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
             final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
             for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
                 final FlowFileRecord flowFile = tempQueue.poll();
                 toSwap.add(flowFile);
-                bytesSwappedOut += flowFile.getSize();
-                flowFilesSwappedOut++;
+                bytesSwappedThisIteration += flowFile.getSize();
             }
 
             try {
                 Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
                 final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName);
                 swapLocations.add(swapLocation);
+
+                bytesSwappedOut += bytesSwappedThisIteration;
+                flowFilesSwappedOut += toSwap.size();
             } catch (final IOException ioe) {
                 tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index cc3ac19..216449c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -161,7 +161,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     // so that we are able to aggregate many into a single Fork Event.
     private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
 
-    private Checkpoint checkpoint = new Checkpoint();
+    private Checkpoint checkpoint = null;
     private final ContentClaimWriteCache claimCache;
 
     public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) {
@@ -1489,7 +1489,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) {
         final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
-        records.put(flowFile.getId(), record);
+
+        // Ensure that the checkpoint does not have a FlowFile with the same ID already. This should not occur,
+        // but this is a safety check just to make sure, because if it were to occur, and we did process the FlowFile,
+        // we would have a lot of problems, since the map is keyed off of the FlowFile ID.
+        if (this.checkpoint != null) {
+            final StandardRepositoryRecord checkpointedRecord = this.checkpoint.getRecord(flowFile);
+            handleConflictingId(flowFile, connection, checkpointedRecord);
+        }
+
+        final StandardRepositoryRecord existingRecord = records.putIfAbsent(flowFile.getId(), record);
+        handleConflictingId(flowFile, connection, existingRecord); // Ensure that we have no conflicts
+
         flowFilesIn++;
         contentSizeIn += flowFile.getSize();
 
@@ -1503,6 +1514,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         incrementConnectionOutputCounts(connection, flowFile);
     }
 
+    private void handleConflictingId(final FlowFileRecord flowFile, final Connection connection, final StandardRepositoryRecord conflict) {
+        if (conflict == null) {
+            // No conflict
+            return;
+        }
+
+        LOG.error("Attempted to pull {} from {} but the Session already has a FlowFile with the same ID ({}): {}, which was pulled from {}. This means that the system has two FlowFiles with the" +
+            " same ID, which should not happen.", flowFile, connection, flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue());
+        connection.getFlowFileQueue().put(flowFile);
+
+        rollback(true, false);
+        throw new FlowFileAccessException("Attempted to pull a FlowFile with ID " + flowFile.getId() + " from Connection "
+            + connection + " but a FlowFile with that ID already exists in the session");
+    }
+
     @Override
     public void adjustCounter(final String name, final long delta, final boolean immediate) {
         verifyTaskActive();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index dee5346..da714a6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -16,16 +16,16 @@
  */
 package org.apache.nifi.controller.repository;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * <p>
  * An in-memory implementation of the {@link FlowFileRepository}. Upon restart, all FlowFiles will be discarded, including those that have been swapped out by a {@link FlowFileSwapManager}.
@@ -137,4 +137,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
     public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue queue, String swapLocation) throws IOException {
     }
 
+    @Override
+    public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+        return false;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index b5a61c6..d8e45f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,19 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -42,19 +55,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
 /**
  * <p>
  * Implements FlowFile Repository using WALI as the backing store.
@@ -101,6 +101,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
 
+    private final Set<String> swapLocationSuffixes = new HashSet<>(); // guraded by synchronizing on object itself
+
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
     private RepositoryRecordSerdeFactory serdeFactory;
@@ -134,7 +136,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
      */
     public WriteAheadFlowFileRepository() {
         alwaysSync = false;
-        checkpointDelayMillis = 0l;
+        checkpointDelayMillis = 0L;
         numPartitions = 0;
         checkpointExecutor = null;
         walImplementation = null;
@@ -278,6 +280,13 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         return !resourceClaim.isInUse();
     }
 
+    @Override
+    public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+        synchronized (swapLocationSuffixes) {
+            return swapLocationSuffixes.contains(swapLocationSuffix);
+        }
+    }
+
     private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING
@@ -308,6 +317,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // This does not, however, cause problems, as ContentRepository should handle this
         // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
         final Set<ResourceClaim> claimsToAdd = new HashSet<>();
+
+        final Set<String> swapLocationsAdded = new HashSet<>();
+        final Set<String> swapLocationsRemoved = new HashSet<>();
+
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if claim is destructible, mark it so
@@ -324,6 +337,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                 if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) {
                     claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
+            } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
+                final String swapLocation = record.getSwapLocation();
+                swapLocationsAdded.add(swapLocation);
+                swapLocationsRemoved.remove(swapLocation);
+            } else if (record.getType() == RepositoryRecordType.SWAP_IN) {
+                final String swapLocation = record.getSwapLocation();
+                swapLocationsRemoved.add(swapLocation);
+                swapLocationsAdded.remove(swapLocation);
             }
 
             final List<ContentClaim> transientClaims = record.getTransientClaims();
@@ -336,6 +357,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             }
         }
 
+        // If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes.
+        if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
+            synchronized (swapLocationSuffixes) {
+                swapLocationsRemoved.forEach(loc -> swapLocationSuffixes.remove(getLocationSuffix(loc)));
+                swapLocationsAdded.forEach(loc -> swapLocationSuffixes.add(getLocationSuffix(loc)));
+            }
+        }
+
         if (!claimsToAdd.isEmpty()) {
             // Get / Register a Set<ContentClaim> for the given Partiton Index
             final Integer partitionKey = Integer.valueOf(partitionIndex);
@@ -352,6 +381,20 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
     }
 
+    protected static String getLocationSuffix(final String swapLocation) {
+        if (swapLocation == null) {
+            return null;
+        }
+
+        final String withoutTrailing = (swapLocation.endsWith("/") && swapLocation.length() > 1) ? swapLocation.substring(0, swapLocation.length() - 1) : swapLocation;
+        final int lastIndex = withoutTrailing.lastIndexOf("/");
+        if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
+            return withoutTrailing;
+        }
+
+        return withoutTrailing.substring(lastIndex + 1);
+    }
+
     @Override
     public void onSync(final int partitionIndex) {
         final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
@@ -407,6 +450,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // update WALI to indicate that the records were swapped out.
         wal.update(repoRecords, true);
 
+        synchronized (this.swapLocationSuffixes) {
+            this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+        }
+
         logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
     }
 
@@ -423,6 +470,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
 
         updateRepository(repoRecords, true);
+
+        synchronized (this.swapLocationSuffixes) {
+            this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+        }
+
         logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
     }
 
@@ -544,6 +596,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // Repo was written using that impl, that we properly recover from the implementation.
         Collection<RepositoryRecord> recordList = wal.recoverRecords();
 
+        final Set<String> recoveredSwapLocations = wal.getRecoveredSwapLocations();
+        synchronized (this.swapLocationSuffixes) {
+            recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(getLocationSuffix(loc)));
+            logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
+        }
+
         // If we didn't recover any records from our write-ahead log, attempt to recover records from the other implementation
         // of the write-ahead log. We do this in case the user changed the "nifi.flowfile.repository.wal.impl" property.
         // In such a case, we still want to recover the records from the previous FlowFile Repository and write them into the new one.
@@ -591,7 +649,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
         // return the appropriate number.
         flowFileSequenceGenerator.set(maxId + 1);
-        logger.info("Successfully restored {} FlowFiles", recordList.size() - numFlowFilesMissingQueue);
+        logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
         if (numFlowFilesMissingQueue > 0) {
             logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
index 33b71f0..a1206c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
@@ -17,16 +17,6 @@
 
 package org.apache.nifi.controller;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -39,6 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
 public class MockSwapManager implements FlowFileSwapManager {
     public final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
     public int swapOutCalledCount = 0;
@@ -49,10 +49,22 @@ public class MockSwapManager implements FlowFileSwapManager {
     public int failSwapInAfterN = -1;
     public Throwable failSwapInFailure = null;
 
+    private int failSwapOutAfterN = -1;
+    private IOException failSwapOutFailure = null;
+
     public void setSwapInFailure(final Throwable t) {
         this.failSwapInFailure = t;
     }
 
+    public void setSwapOutFailureOnNthIteration(final int n) {
+        setSwapOutFailureOnNthIteration(n, null);
+    }
+
+    public void setSwapOutFailureOnNthIteration(final int n, final IOException failureException) {
+        this.failSwapOutAfterN = n;
+        this.failSwapOutFailure = failureException;
+    }
+
     @Override
     public void initialize(final SwapManagerInitializationContext initializationContext) {
 
@@ -65,6 +77,12 @@ public class MockSwapManager implements FlowFileSwapManager {
     @Override
     public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
         swapOutCalledCount++;
+
+        if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN) {
+            final IOException ioe = failSwapOutFailure == null ? new IOException("Intentional Unit Test IOException on swap out call number " + swapOutCalledCount) : failSwapOutFailure;
+            throw ioe;
+        }
+
         final String location = UUID.randomUUID().toString() + "." + partitionName;
         swappedOut.put(location, new ArrayList<>(flowFiles));
         return location;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 46bea31..dd71f0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -16,28 +16,37 @@
  */
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.junit.Test;
-import org.mockito.Mockito;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class TestFileSystemSwapManager {
 
@@ -48,7 +57,7 @@ public class TestFileSystemSwapManager {
                 final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
 
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
-            Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+            when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
             final FileSystemSwapManager swapManager = createSwapManager();
             final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
@@ -63,11 +72,88 @@ public class TestFileSystemSwapManager {
         }
     }
 
+    @Test
+    public void testFailureOnRepoSwapOut() throws IOException {
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        Mockito.doThrow(new IOException("Intentional IOException for unit test"))
+            .when(flowFileRepo).updateRepository(anyCollection());
+
+        final FileSystemSwapManager swapManager = createSwapManager();
+
+        final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
+        for (int i=0; i < 10000; i++) {
+            flowFileRecords.add(new MockFlowFileRecord(i));
+        }
+
+        try {
+            swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1");
+            Assert.fail("Expected IOException");
+        } catch (final IOException ioe) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn("");
+
+        final File targetDir = new File("target/swap");
+        targetDir.mkdirs();
+
+        final File targetFile = new File(targetDir, "444-old-swap-file.swap");
+        final File originalSwapFile = new File("src/test/resources/swap/444-old-swap-file.swap");
+        try (final OutputStream fos = new FileOutputStream(targetFile);
+             final InputStream fis = new FileInputStream(originalSwapFile)) {
+            StreamUtils.copy(fis, fos);
+        }
+
+        final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target"));
+        final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+
+        swapManager.initialize(new SwapManagerInitializationContext() {
+            @Override
+            public ResourceClaimManager getResourceClaimManager() {
+                return resourceClaimManager;
+            }
+
+            @Override
+            public FlowFileRepository getFlowFileRepository() {
+                return flowFileRepo;
+            }
+
+            @Override
+            public EventReporter getEventReporter() {
+                return EventReporter.NO_OP;
+            }
+        });
+
+        when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false);
+        final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null);
+        assertEquals(1, recoveredLocations.size());
+
+        final String firstLocation = recoveredLocations.get(0);
+        final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue);
+        assertEquals(0, emptyContents.getFlowFiles().size());
+
+        when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true);
+        when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+        final SwapContents contents = swapManager.swapIn(firstLocation, flowFileQueue);
+        assertEquals(10000, contents.getFlowFiles().size());
+    }
 
     private FileSystemSwapManager createSwapManager() {
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        return createSwapManager(flowFileRepo);
+    }
+
+    private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) {
         final FileSystemSwapManager swapManager = new FileSystemSwapManager();
         final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
-        final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class);
         swapManager.initialize(new SwapManagerInitializationContext() {
             @Override
             public ResourceClaimManager getResourceClaimManager() {
@@ -76,7 +162,7 @@ public class TestFileSystemSwapManager {
 
             @Override
             public FlowFileRepository getFlowFileRepository() {
-                return flowfileRepo;
+                return flowFileRepo;
             }
 
             @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 71ad257..ef1a063 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -17,21 +17,6 @@
 
 package org.apache.nifi.controller.queue.clustered;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.nifi.controller.MockFlowFileRecord;
 import org.apache.nifi.controller.MockSwapManager;
 import org.apache.nifi.controller.queue.DropFlowFileAction;
@@ -42,16 +27,34 @@ import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestSwappablePriorityQueue {
 
     private MockSwapManager swapManager;
-    private final EventReporter eventReporter = EventReporter.NO_OP;
+    private final List<String> events = new ArrayList<>();
+    private EventReporter eventReporter;
+
     private final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
     private final DropFlowFileAction dropAction = (flowFiles, requestor) -> {
         return new QueueSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
@@ -63,12 +66,36 @@ public class TestSwappablePriorityQueue {
     public void setup() {
         swapManager = new MockSwapManager();
 
+        events.clear();
+        eventReporter = new EventReporter() {
+            @Override
+            public void reportEvent(final Severity severity, final String category, final String message) {
+                events.add(message);
+            }
+        };
+
         when(flowFileQueue.getIdentifier()).thenReturn("unit-test");
         queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local");
     }
 
 
     @Test
+    public void testSwapOutFailureLeavesCorrectQueueSize() {
+        swapManager.setSwapOutFailureOnNthIteration(1, null);
+
+        for (int i = 0; i < 19999; i++) {
+            queue.put(new MockFlowFile(i));
+        }
+
+        assertEquals(19999, queue.size().getObjectCount());
+        assertEquals(0, events.size());
+
+        queue.put(new MockFlowFile(20000));
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(1, events.size()); // Expect a single failure event to be emitted
+    }
+
+    @Test
     public void testPrioritizer() {
         final FlowFilePrioritizer prioritizer = (o1, o2) -> Long.compare(o1.getId(), o2.getId());
         queue.setPriorities(Collections.singletonList(prioritizer));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index efe2bd4..7cd2fd6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,14 +277,13 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+        final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecordBuilder.build());
+        flowFileQueue.put(flowFileRecordBuilder.id(1001).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -296,6 +295,36 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testHandlingOfMultipleFlowFilesWithSameId() {
+        // Add two FlowFiles with the same ID
+        for (int i=0; i < 2; i++) {
+            final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+                .id(1000L)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .size(0L)
+                .build();
+
+            flowFileQueue.put(flowFileRecord);
+        }
+
+        final Relationship relationship = new Relationship.Builder().name("A").build();
+
+        FlowFile ff1 = session.get();
+        assertNotNull(ff1);
+
+        session.transfer(ff1, relationship);
+
+        try {
+            session.get();
+            Assert.fail("Should not have been able to poll second FlowFile with same ID");
+        } catch (final FlowFileAccessException e) {
+            // Expected
+        }
+    }
+
+
+    @Test
     public void testCloneOriginalDataSmaller() throws IOException {
         final byte[] originalContent = "hello".getBytes();
         final byte[] replacementContent = "NEW DATA".getBytes();
@@ -416,14 +445,14 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+
+        final StandardFlowFileRecord.Builder flowFileRecord = new StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecord.build());
+        flowFileQueue.put(flowFileRecord.id(1001).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -475,14 +504,13 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+        final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecordBuilder.build());
+        flowFileQueue.put(flowFileRecordBuilder.id(10001L).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -1383,10 +1411,11 @@ public class TestStandardProcessSession {
     @Test
     public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
-                .build();
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -1399,12 +1428,13 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(2)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
-                .contentClaimOffset(1000L)
-                .size(1000L)
-                .build();
+            .contentClaimOffset(1000L)
+            .size(1000L)
+            .build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -1424,10 +1454,11 @@ public class TestStandardProcessSession {
     @Test
     public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
-                .build();
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
@@ -1441,10 +1472,11 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(2)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
-                .contentClaimOffset(1000L).size(1L).build();
+            .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -1544,11 +1576,15 @@ public class TestStandardProcessSession {
 
     @Test
     public void testRollbackAfterCheckpoint() {
-        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+        final StandardFlowFileRecord.Builder recordBuilder = new StandardFlowFileRecord.Builder()
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
-                .contentClaimOffset(0L).size(0L).build();
+            .contentClaimOffset(0L)
+            .size(0L);
+
+        final FlowFileRecord flowFileRecord = recordBuilder.build();
         flowFileQueue.put(flowFileRecord);
 
         final FlowFile originalFlowFile = session.get();
@@ -1574,7 +1610,7 @@ public class TestStandardProcessSession {
 
         session.rollback();
 
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(recordBuilder.id(2).build());
         assertFalse(flowFileQueue.isActiveQueueEmpty());
 
         final FlowFile originalRound2 = session.get();
@@ -1596,8 +1632,8 @@ public class TestStandardProcessSession {
 
         session.commit();
 
-        // FlowFile transferred back to queue
-        assertEquals(1, flowFileQueue.size().getObjectCount());
+        // FlowFiles transferred back to queue
+        assertEquals(2, flowFileQueue.size().getObjectCount());
         assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
         assertFalse(flowFileQueue.isActiveQueueEmpty());
     }
@@ -2116,6 +2152,11 @@ public class TestStandardProcessSession {
         @Override
         public void initialize(ResourceClaimManager claimManager) throws IOException {
         }
+
+        @Override
+        public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+            return false;
+        }
     }
 
     private static class MockContentRepository implements ContentRepository {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index a3ee5c1..1761bd8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -70,6 +70,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -396,6 +397,101 @@ public class TestWriteAheadFlowFileRepository {
     }
 
 
+    @Test
+    public void testGetLocationSuffix() {
+        assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
+        assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
+        assertEquals(null, WriteAheadFlowFileRepository.getLocationSuffix(null));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
+        assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
+    }
+
+    @Test
+    public void testSwapLocationsRestored() throws IOException {
+        final Path path = Paths.get("target/test-swap-repo");
+        if (Files.exists(path)) {
+            FileUtils.deleteFile(path.toFile(), true);
+        }
+
+        final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
+        repo.initialize(new StandardResourceClaimManager());
+
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        repo.loadFlowFiles(queueProvider, 0L);
+
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+
+        queueProvider.addConnection(connection);
+
+        StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+        ffBuilder.id(1L);
+        ffBuilder.size(0L);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final List<RepositoryRecord> records = new ArrayList<>();
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
+        record.setDestination(queue);
+        records.add(record);
+
+        repo.updateRepository(records);
+        repo.close();
+
+        // restore
+        final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
+        repo2.initialize(new StandardResourceClaimManager());
+        repo2.loadFlowFiles(queueProvider, 0L);
+        assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
+        assertFalse(repo2.isValidSwapLocationSuffix("other"));
+        repo2.close();
+    }
+
+    @Test
+    public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
+        final Path path = Paths.get("target/test-swap-repo");
+        if (Files.exists(path)) {
+            FileUtils.deleteFile(path.toFile(), true);
+        }
+
+        final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
+        repo.initialize(new StandardResourceClaimManager());
+
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        repo.loadFlowFiles(queueProvider, 0L);
+
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+
+        queueProvider.addConnection(connection);
+
+        StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+        ffBuilder.id(1L);
+        ffBuilder.size(0L);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final List<RepositoryRecord> records = new ArrayList<>();
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
+        record.setDestination(queue);
+        records.add(record);
+
+        assertFalse(repo.isValidSwapLocationSuffix("swap123"));
+        repo.updateRepository(records);
+        assertTrue(repo.isValidSwapLocationSuffix("swap123"));
+        repo.close();
+    }
 
     @Test
     public void testResourceClaimsIncremented() throws IOException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
new file mode 100755
index 0000000..0176ed9
Binary files /dev/null and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap differ