You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/18 22:26:39 UTC

nifi git commit: NIFI-1527: Ensure that we increment Claimant Counts for content claims that are referenced by Swapped-Out FlowFiles on restart of nifi

Repository: nifi
Updated Branches:
  refs/heads/master 1c73d9090 -> 3bb18b965


NIFI-1527: Ensure that we increment Claimant Counts for content claims that are referenced by Swapped-Out FlowFiles on restart of nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3bb18b96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3bb18b96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3bb18b96

Branch: refs/heads/master
Commit: 3bb18b965348b439cadc8f548429ab1347f9c305
Parents: 1c73d90
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 18 11:14:35 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 18 16:25:57 2016 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java    |  13 +-
 .../apache/nifi/controller/queue/QueueSize.java |  26 +-
 .../repository/FlowFileSwapManager.java         |  17 +-
 .../nifi/controller/repository/SwapSummary.java |  50 ++++
 .../nifi/controller/FileSystemSwapManager.java  |  52 ++--
 .../apache/nifi/controller/FlowController.java  |  14 +-
 .../nifi/controller/StandardFlowFileQueue.java  |  13 +-
 .../controller/swap/StandardSwapSummary.java    |  54 ++++
 .../controller/TestStandardFlowFileQueue.java   |  31 ++-
 .../TestWriteAheadFlowFileRepository.java       | 247 +++++++++++++++++--
 10 files changed, 418 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index f2066a8..7948ecb 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.FlowFileFilter;
 
@@ -40,16 +41,14 @@ public interface FlowFileQueue {
     List<FlowFilePrioritizer> getPriorities();
 
     /**
-     * Reads any Swap Files that belong to this queue and increments counts so that the size
-     * of the queue will reflect the size of all FlowFiles regardless of whether or not they are
-     * swapped out. This will be called only during NiFi startup as an initialization step. This
-     * method is then responsible for returning the largest ID of any FlowFile that is swapped
+     * Reads any Swap Files that belong to this queue and returns a summary of what is swapped out.
+     * This will be called only during NiFi startup as an initialization step. This
+     * method is then responsible for returning a FlowFileSummary of the FlowFiles that are swapped
      * out, or <code>null</code> if no FlowFiles are swapped out for this queue.
      *
-     * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
-     *         no FlowFiles are swapped out for this queue.
+     * @return a SwapSummary that describes the FlowFiles that exist in the queue but are swapped out.
      */
-    Long recoverSwappedFlowFiles();
+    SwapSummary recoverSwappedFlowFiles();
 
     /**
      * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index a59db4c..35a860b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -22,9 +22,9 @@ import java.text.NumberFormat;
  *
  */
 public class QueueSize {
-
     private final int objectCount;
     private final long totalSizeBytes;
+    private final int hashCode;
 
     public QueueSize(final int numberObjects, final long totalSizeBytes) {
         if (numberObjects < 0 || totalSizeBytes < 0) {
@@ -32,6 +32,7 @@ public class QueueSize {
         }
         objectCount = numberObjects;
         this.totalSizeBytes = totalSizeBytes;
+        hashCode = (int) (41 + 47 * objectCount + 51 * totalSizeBytes);
     }
 
     /**
@@ -66,4 +67,27 @@ public class QueueSize {
     public String toString() {
         return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
     }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof QueueSize)) {
+            return false;
+        }
+
+        final QueueSize other = (QueueSize) obj;
+        return getObjectCount() == other.getObjectCount() && getByteCount() == other.getByteCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 3e341f8..e07cf1a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
 
 /**
  * Defines a mechanism by which FlowFiles can be move into external storage or
@@ -90,20 +89,14 @@ public interface FlowFileSwapManager {
     List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
 
     /**
-     * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
+     * Parses the contents of the swap file at the given location and provides a SwapSummary that provides
+     * pertinent information about the information stored within the swap file
      *
      * @param swapLocation the location of the swap file
-     * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
+     * @return a SwapSummary that provides information about what is contained within the swap file
+     * @throws IOException if unable to read or parse the swap file
      */
-    QueueSize getSwapSize(String swapLocation) throws IOException;
-
-    /**
-     * Returns the maximum record id of the FlowFiles stored at the given swap location
-     *
-     * @param swapLocation the swap location to read id's from
-     * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
-     */
-    Long getMaxRecordId(String swapLocation) throws IOException;
+    SwapSummary getSwapSummary(String swapLocation) throws IOException;
 
     /**
      * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
new file mode 100644
index 0000000..ad01ccd
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.List;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+
+/**
+ * <p>
+ * Provides a summary of the information that is stored in a FlowFile swap file.
+ * </p>
+ */
+public interface SwapSummary {
+    /**
+     * @return a QueueSize that represents the number of FlowFiles are in the swap file and their
+     *         aggregate content size
+     */
+    QueueSize getQueueSize();
+
+    /**
+     * @return the largest ID of any of the FlowFiles that are contained in the swap file
+     */
+    Long getMaxFlowFileId();
+
+    /**
+     * Returns a List of all ResoruceClaims that are referenced by the FlowFiles in the swap file.
+     * This List may well contain the same ResourceClaim many times. This indicates that many FlowFiles
+     * reference the same ResourceClaim.
+     *
+     * @return a List of all ResourceClaims that are referenced by the FlowFiles in the swap file
+     */
+    List<ResourceClaim> getResourceClaims();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 76f7cd7..105dcff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -47,10 +47,12 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -237,8 +239,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         return swapLocations;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
-    public QueueSize getSwapSize(final String swapLocation) throws IOException {
+    public SwapSummary getSwapSummary(final String swapLocation) throws IOException {
         final File swapFile = new File(swapLocation);
 
         // read record from disk via the swap file
@@ -259,56 +262,36 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             final int numRecords = in.readInt();
             final long contentSize = in.readLong();
 
-            return new QueueSize(numRecords, contentSize);
-        }
-    }
-
-    @Override
-    public Long getMaxRecordId(final String swapLocation) throws IOException {
-        final File swapFile = new File(swapLocation);
-
-        // read record from disk via the swap file
-        try (final InputStream fis = new FileInputStream(swapFile);
-            final InputStream bufferedIn = new BufferedInputStream(fis);
-            final DataInputStream in = new DataInputStream(bufferedIn)) {
-
-            final int swapEncodingVersion = in.readInt();
-            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
-                throw new IOException(errMsg);
-            }
-
-            in.readUTF(); // ignore connection id
-            final int numRecords = in.readInt();
-            in.readLong(); // ignore content size
-
             if (numRecords == 0) {
-                return null;
+                return StandardSwapSummary.EMPTY_SUMMARY;
             }
 
+            Long maxRecordId = null;
             if (swapEncodingVersion > 7) {
-                final long maxRecordId = in.readLong();
-                return maxRecordId;
+                maxRecordId = in.readLong();
             }
 
             // Before swap encoding version 8, we did not write out the max record id, so we have to read all
             // swap files to determine the max record id
+            final List<ResourceClaim> resourceClaims = new ArrayList<>(numRecords);
             final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, true, claimManager);
-            long maxId = 0L;
             for (final FlowFileRecord record : records) {
-                if (record.getId() > maxId) {
-                    maxId = record.getId();
+                if (maxRecordId == null || record.getId() > maxRecordId) {
+                    maxRecordId = record.getId();
+                }
+
+                final ContentClaim contentClaim = record.getContentClaim();
+                if (contentClaim != null) {
+                    resourceClaims.add(contentClaim.getResourceClaim());
                 }
             }
 
-            return maxId;
+            return new StandardSwapSummary(new QueueSize(numRecords, contentSize), maxRecordId, resourceClaims);
         }
     }
 
 
+    @SuppressWarnings("deprecation")
     public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return 0;
@@ -636,5 +619,4 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dab2d3d..6d09bf6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -73,6 +73,7 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -591,9 +592,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             } else {
                 for (final Connection connection : connections) {
                     final FlowFileQueue queue = connection.getFlowFileQueue();
-                    final Long maxFlowFileId = queue.recoverSwappedFlowFiles();
-                    if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
-                        maxIdFromSwapFiles = maxFlowFileId;
+                    final SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
+                    if (swapSummary != null) {
+                        final Long maxFlowFileId = swapSummary.getMaxFlowFileId();
+                        if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
+                            maxIdFromSwapFiles = maxFlowFileId;
+                        }
+
+                        for (final ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
+                            resourceClaimManager.incrementClaimantCount(resourceClaim);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 04763ee..22aacdc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -49,9 +49,11 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.RepositoryRecord;
 import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -787,10 +789,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public Long recoverSwappedFlowFiles() {
+    public SwapSummary recoverSwappedFlowFiles() {
         int swapFlowFileCount = 0;
         long swapByteCount = 0L;
         Long maxId = null;
+        List<ResourceClaim> resourceClaims = new ArrayList<>();
 
         writeLock.lock();
         try {
@@ -809,8 +812,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
             for (final String swapLocation : swapLocations) {
                 try {
-                    final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
-                    final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+                    final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
+                    final QueueSize queueSize = summary.getQueueSize();
+                    final Long maxSwapRecordId = summary.getMaxFlowFileId();
                     if (maxSwapRecordId != null) {
                         if (maxId == null || maxSwapRecordId > maxId) {
                             maxId = maxSwapRecordId;
@@ -819,6 +823,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                     swapFlowFileCount += queueSize.getObjectCount();
                     swapByteCount += queueSize.getByteCount();
+                    resourceClaims.addAll(summary.getResourceClaims());
                 } catch (final IOException ioe) {
                     logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
                     logger.error("", ioe);
@@ -835,7 +840,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             writeLock.unlock("Recover Swap Files");
         }
 
-        return maxId;
+        return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
new file mode 100644
index 0000000..57a2ffb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+
+public class StandardSwapSummary implements SwapSummary {
+    public static final SwapSummary EMPTY_SUMMARY = new StandardSwapSummary(new QueueSize(0, 0L), null, Collections.<ResourceClaim> emptyList());
+
+    private final QueueSize queueSize;
+    private final Long maxFlowFileId;
+    private final List<ResourceClaim> resourceClaims;
+
+    public StandardSwapSummary(final QueueSize queueSize, final Long maxFlowFileId, final List<ResourceClaim> resourceClaims) {
+        this.queueSize = queueSize;
+        this.maxFlowFileId = maxFlowFileId;
+        this.resourceClaims = Collections.unmodifiableList(resourceClaims);
+    }
+
+    @Override
+    public QueueSize getQueueSize() {
+        return queueSize;
+    }
+
+    @Override
+    public Long getMaxFlowFileId() {
+        return maxFlowFileId;
+    }
+
+    @Override
+    public List<ResourceClaim> getResourceClaims() {
+        return resourceClaims;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index b84ae90..412e376 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -46,8 +46,11 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -472,37 +475,30 @@ public class TestStandardFlowFileQueue {
         }
 
         @Override
-        public QueueSize getSwapSize(String swapLocation) throws IOException {
+        @SuppressWarnings("deprecation")
+        public SwapSummary getSwapSummary(String swapLocation) throws IOException {
             final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
             if (flowFiles == null) {
-                return new QueueSize(0, 0L);
+                return StandardSwapSummary.EMPTY_SUMMARY;
             }
 
             int count = 0;
             long size = 0L;
+            Long max = null;
+            final List<ResourceClaim> resourceClaims = new ArrayList<>();
             for (final FlowFileRecord flowFile : flowFiles) {
                 count++;
                 size += flowFile.getSize();
-            }
-
-            return new QueueSize(count, size);
-        }
-
-        @Override
-        public Long getMaxRecordId(String swapLocation) throws IOException {
-            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
-            if (flowFiles == null) {
-                return null;
-            }
-
-            Long max = null;
-            for (final FlowFileRecord flowFile : flowFiles) {
                 if (max == null || flowFile.getId() > max) {
                     max = flowFile.getId();
                 }
+
+                if (flowFile.getContentClaim() != null) {
+                    resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
+                }
             }
 
-            return max;
+            return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
         }
 
         @Override
@@ -588,6 +584,7 @@ public class TestStandardFlowFileQueue {
         }
 
         @Override
+        @SuppressWarnings("deprecation")
         public int compareTo(final FlowFile o) {
             return Long.compare(id, o.getId());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index e836b44..c1c7b45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -16,27 +16,40 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
-import org.apache.nifi.controller.repository.StandardRepositoryRecord;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
+import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.util.file.FileUtils;
-
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -44,9 +57,106 @@ import org.mockito.stubbing.Answer;
 
 public class TestWriteAheadFlowFileRepository {
 
+    @BeforeClass
+    public static void setupProperties() {
+        System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+    }
+
+    @Before
+    public void clearRepo() throws IOException {
+        final File target = new File("target");
+        final File testRepo = new File(target, "test-repo");
+        if (testRepo.exists()) {
+            FileUtils.deleteFile(testRepo, true);
+        }
+    }
+
+    @Test
+    public void testResourceClaimsIncremented() throws IOException {
+        final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null);
+
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+
+        final ResourceClaim resourceClaim1 = new StandardResourceClaim("container", "section", "1", false);
+        final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
+
+        final ResourceClaim resourceClaim2 = new StandardResourceClaim("container", "section", "2", false);
+        final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
+
+        // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
+        // indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
+        // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
+        try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
+            repo.initialize(claimManager);
+            repo.loadFlowFiles(queueProvider, -1L);
+
+            // Create a Repository Record that indicates that a FlowFile was created
+            final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+                .contentClaim(claim1)
+                .build();
+            final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
+            rec1.setWorking(flowFile1);
+            rec1.setDestination(queue);
+
+            // Create a Record that we can swap out
+            final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
+                .id(2L)
+                .addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
+                .contentClaim(claim2)
+                .build();
+
+            final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
+            rec2.setWorking(flowFile2);
+            rec2.setDestination(queue);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            records.add(rec1);
+            records.add(rec2);
+            repo.updateRepository(records);
+
+            final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue);
+            repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
+        }
+
+
+        final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
+        try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
+            repo.initialize(recoveryClaimManager);
+            final long largestId = repo.loadFlowFiles(queueProvider, 0L);
+
+            // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
+            assertEquals(1, largestId);
+        }
+
+        // resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
+        // because resource claim 2 is referenced only by flowfiles that are swapped out.
+        assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
+        assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
+
+        final SwapSummary summary = queue.recoverSwappedFlowFiles();
+        assertNotNull(summary);
+        assertEquals(2, summary.getMaxFlowFileId().intValue());
+        assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
+
+        final List<ResourceClaim> swappedOutClaims = summary.getResourceClaims();
+        assertNotNull(swappedOutClaims);
+        assertEquals(1, swappedOutClaims.size());
+        assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
+    }
+
     @Test
     public void testRestartWithOneRecord() throws IOException {
-        System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
         final Path path = Paths.get("target/test-repo");
         if (Files.exists(path)) {
             FileUtils.deleteFile(path.toFile(), true);
@@ -55,19 +165,7 @@ public class TestWriteAheadFlowFileRepository {
         final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository();
         repo.initialize(new StandardResourceClaimManager());
 
-        final List<Connection> connectionList = new ArrayList<>();
-        final QueueProvider queueProvider = new QueueProvider() {
-            @Override
-            public Collection<FlowFileQueue> getAllQueues() {
-                final List<FlowFileQueue> queueList = new ArrayList<>();
-                for (final Connection conn : connectionList) {
-                    queueList.add(conn.getFlowFileQueue());
-                }
-
-                return queueList;
-            }
-        };
-
+        final TestQueueProvider queueProvider = new TestQueueProvider();
         repo.loadFlowFiles(queueProvider, 0L);
 
         final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
@@ -87,7 +185,7 @@ public class TestWriteAheadFlowFileRepository {
 
         when(connection.getFlowFileQueue()).thenReturn(queue);
 
-        connectionList.add(connection);
+        queueProvider.addConnection(connection);
 
         StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
         ffBuilder.id(1L);
@@ -132,4 +230,113 @@ public class TestWriteAheadFlowFileRepository {
         repo2.close();
     }
 
+    private static class TestQueueProvider implements QueueProvider {
+        private List<Connection> connectionList = new ArrayList<>();
+
+        public void addConnection(final Connection connection) {
+            this.connectionList.add(connection);
+        }
+
+        @Override
+        public Collection<FlowFileQueue> getAllQueues() {
+            final List<FlowFileQueue> queueList = new ArrayList<>();
+            for (final Connection conn : connectionList) {
+                queueList.add(conn.getFlowFileQueue());
+            }
+
+            return queueList;
+        }
+    }
+
+    private static class MockFlowFileSwapManager implements FlowFileSwapManager {
+        private final Map<FlowFileQueue, Map<String, List<FlowFileRecord>>> swappedRecords = new HashMap<>();
+
+        @Override
+        public void initialize(SwapManagerInitializationContext initializationContext) {
+        }
+
+        @Override
+        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+            Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+            if (swapMap == null) {
+                swapMap = new HashMap<>();
+                swappedRecords.put(flowFileQueue, swapMap);
+            }
+
+            final String location = UUID.randomUUID().toString();
+            swapMap.put(location, new ArrayList<>(flowFiles));
+            return location;
+        }
+
+        @Override
+        public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+            Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+            if (swapMap == null) {
+                return null;
+            }
+
+            return Collections.unmodifiableList(swapMap.get(swapLocation));
+        }
+
+        @Override
+        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+            Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+            if (swapMap == null) {
+                return null;
+            }
+
+            return swapMap.remove(swapLocation);
+        }
+
+        @Override
+        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+            Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+            if (swapMap == null) {
+                return null;
+            }
+
+            return new ArrayList<>(swapMap.keySet());
+        }
+
+        @Override
+        @SuppressWarnings("deprecation")
+        public SwapSummary getSwapSummary(String swapLocation) throws IOException {
+            List<FlowFileRecord> records = null;
+            for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {
+                records = swapMap.get(swapLocation);
+                if (records != null) {
+                    break;
+                }
+            }
+
+            if (records == null) {
+                return null;
+            }
+
+            final List<ResourceClaim> resourceClaims = new ArrayList<>();
+            long size = 0L;
+            Long maxId = null;
+            for (final FlowFileRecord flowFile : records) {
+                size += flowFile.getSize();
+
+                if (maxId == null || flowFile.getId() > maxId) {
+                    maxId = flowFile.getId();
+                }
+
+                final ContentClaim contentClaim = flowFile.getContentClaim();
+                if (contentClaim != null) {
+                    final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+                    resourceClaims.add(resourceClaim);
+                }
+            }
+
+            return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
+        }
+
+        @Override
+        public void purge() {
+            this.swappedRecords.clear();
+        }
+
+    }
 }