You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/31 14:03:16 UTC

nifi git commit: NIFI-2920 This is a 0.x version of Mark Payne's patch for NIFI-2925

Repository: nifi
Updated Branches:
  refs/heads/0.x 05d5fbad6 -> a9395bc67


NIFI-2920 This is a 0.x version of Mark Payne's patch for NIFI-2925


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9395bc6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9395bc6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9395bc6

Branch: refs/heads/0.x
Commit: a9395bc67c2de4fa6ba82ca0573ad210ccc67434
Parents: 05d5fba
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Oct 27 14:31:53 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Oct 31 10:02:51 2016 -0400

----------------------------------------------------------------------
 .../repository/claim/ResourceClaimManager.java  | 14 ++++-
 .../nifi/controller/FileSystemSwapManager.java  |  9 +++-
 .../apache/nifi/controller/FlowController.java  | 11 ++--
 .../repository/FileSystemRepository.java        |  5 +-
 .../repository/VolatileContentRepository.java   |  2 +-
 .../WriteAheadFlowFileRepository.java           |  2 +-
 .../claim/StandardResourceClaimManager.java     | 54 +++++++++++++++-----
 .../controller/TestFileSystemSwapManager.java   |  7 ++-
 .../repository/TestStandardProcessSession.java  | 20 ++++----
 .../TestVolatileContentRepository.java          |  2 +-
 .../TestWriteAheadFlowFileRepository.java       |  4 +-
 .../claim/TestStandardResourceClaimManager.java |  2 +-
 12 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index b430df0..68643f9 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -32,9 +32,21 @@ public interface ResourceClaimManager {
      * @param container of claim
      * @param section of claim
      * @param lossTolerant of claim
+     * @param writable whether or not the claim should be made writable
      * @return new claim
      */
-    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable);
+
+    /**
+     * Returns the Resource Claim with the given id, container, and section, if one exists, <code>null</code> otherwise
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @return the existing resource claim or <code>null</code> if none exists
+     */
+    ResourceClaim getResourceClaim(String container, String section, String id);
+
 
     /**
      * @param claim to obtain reference count for

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 156389b..15a90cb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -492,7 +492,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         lossTolerant = false;
                     }
 
-                    resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                    resourceClaim = claimManager.getResourceClaim(container, section, claimId);
+                    if (resourceClaim == null) {
+                        logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
+                            + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
+                            + "ability to properly clean up this resource", container, section, claimId);
+                        resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
+                    }
+
                     final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
                     claim.setLength(resourceLength);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3eff44c..0895006 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3349,7 +3349,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     return null;
                 }
 
-                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false);
+                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false);
                 return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
             }
 
@@ -3375,7 +3375,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
-                provEvent.getPreviousContentClaimIdentifier(), false);
+                provEvent.getPreviousContentClaimIdentifier(), false, false);
+
             claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
             offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
@@ -3385,7 +3386,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
-                provEvent.getContentClaimIdentifier(), false);
+                provEvent.getContentClaimIdentifier(), false, false);
 
             claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
             offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
@@ -3478,7 +3479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         try {
-            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
 
             if (!contentRepository.isAccessible(contentClaim)) {
@@ -3559,7 +3560,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
         resourceClaimManager.incrementClaimantCount(resourceClaim);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 8a3ac6d..d94869f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -426,7 +426,7 @@ public class FileSystemRepository implements ContentRepository {
         final String id = idPath.toFile().getName();
         final String sectionName = sectionPath.toFile().getName();
 
-        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false);
         if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
             removeIncompleteContent(fileToRemove);
         }
@@ -524,7 +524,7 @@ public class FileSystemRepository implements ContentRepository {
             final String section = String.valueOf(modulatedSectionIndex);
             final String claimId = System.currentTimeMillis() + "-" + currentIndex;
 
-            resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+            resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true);
             resourceOffset = 0L;
             LOG.debug("Creating new Resource Claim {}", resourceClaim);
 
@@ -939,6 +939,7 @@ public class FileSystemRepository implements ContentRepository {
                         LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this);
                     } else {
                         writableClaimStreams.remove(scc.getResourceClaim());
+                        resourceClaimManager.freeze(scc.getResourceClaim());
 
                         bcos.close();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 08b7e80..1f75320 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -203,7 +203,7 @@ public class VolatileContentRepository implements ContentRepository {
 
     private ContentClaim createLossTolerant() {
         final long id = idGenerator.getAndIncrement();
-        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false);
         final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
         final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
         claimManager.incrementClaimantCount(resourceClaim, true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 27d6c9b..ca52544 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -779,7 +779,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                     lossTolerant = false;
                 }
 
-                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
                 final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
                 contentClaim.setLength(resourceLength);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 9cb0fa1..be0e8b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory;
 
 public class StandardResourceClaimManager implements ResourceClaimManager {
 
-    private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
     private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
 
     private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
 
     @Override
-    public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
-        return new StandardResourceClaim(this, container, section, id, lossTolerant);
+    public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) {
+        final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant);
+        if (!writable) {
+            claim.freeze();
+        }
+        return claim;
+    }
+
+    @Override
+    public ResourceClaim getResourceClaim(final String container, final String section, final String id) {
+        final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false);
+        final ClaimCount count = claimantCounts.get(tempClaim);
+        return (count == null) ? null : count.getClaim();
     }
 
     private static AtomicInteger getCounter(final ResourceClaim claim) {
@@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
             return null;
         }
 
-        AtomicInteger counter = claimantCounts.get(claim);
+        ClaimCount counter = claimantCounts.get(claim);
         if (counter != null) {
-            return counter;
+            return counter.getCount();
         }
 
-        counter = new AtomicInteger(0);
-        final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
-        return existingCounter == null ? counter : existingCounter;
+        counter = new ClaimCount(claim, new AtomicInteger(0));
+        final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter);
+        return existingCounter == null ? counter.getCount() : existingCounter.getCount();
     }
 
     @Override
@@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
         }
 
         synchronized (claim) {
-            final AtomicInteger counter = claimantCounts.get(claim);
-            return counter == null ? 0 : counter.get();
+            final ClaimCount counter = claimantCounts.get(claim);
+            return counter == null ? 0 : counter.getCount().get();
         }
     }
 
@@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
         }
 
         synchronized (claim) {
-            final AtomicInteger counter = claimantCounts.get(claim);
+            final ClaimCount counter = claimantCounts.get(claim);
             if (counter == null) {
                 logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
                 return -1;
             }
 
-            final int newClaimantCount = counter.decrementAndGet();
+            final int newClaimantCount = counter.getCount().decrementAndGet();
             if (newClaimantCount < 0) {
                 logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount);
             } else {
@@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
 
         ((StandardResourceClaim) claim).freeze();
     }
+
+
+    private static final class ClaimCount {
+        private final ResourceClaim claim;
+        private final AtomicInteger count;
+
+        public ClaimCount(final ResourceClaim claim, final AtomicInteger count) {
+            this.claim = claim;
+            this.count = count;
+        }
+
+        public AtomicInteger getCount() {
+            return count;
+        }
+
+        public ResourceClaim getClaim() {
+            return claim;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 2ab8e35..24b61b9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -120,7 +120,12 @@ public class TestFileSystemSwapManager {
     public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
-        public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) {
+        public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
+            return null;
+        }
+
+        @Override
+        public ResourceClaim getResourceClaim(String container, String section, String id) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 2a401c6..71bfd27 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -829,7 +829,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
             .size(1L)
             .build();
         flowFileQueue.put(flowFileRecord);
@@ -977,7 +977,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
             .size(1L)
             .build();
         flowFileQueue.put(flowFileRecord);
@@ -1001,7 +1001,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
             .build();
         flowFileQueue.put(flowFileRecord);
 
@@ -1017,7 +1017,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
             .contentClaimOffset(1000L)
             .size(1000L)
             .build();
@@ -1042,7 +1042,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
             .build();
 
         flowFileQueue.put(flowFileRecord);
@@ -1059,7 +1059,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
 
         .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
@@ -1128,7 +1128,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
 
         .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
@@ -1166,7 +1166,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
 
         .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
@@ -1395,7 +1395,7 @@ public class TestStandardProcessSession {
             final Set<ContentClaim> claims = new HashSet<>();
 
             for (long i = 0; i < idGenerator.get(); i++) {
-                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false);
+                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false, false);
                 final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
                 if (getClaimantCount(contentClaim) > 0) {
                     claims.add(contentClaim);
@@ -1407,7 +1407,7 @@ public class TestStandardProcessSession {
 
         @Override
         public ContentClaim create(boolean lossTolerant) throws IOException {
-            final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
+            final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
 
             claimantCounts.put(contentClaim, new AtomicInteger(1));

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index 5733164..0b24c03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -83,7 +83,7 @@ public class TestVolatileContentRepository {
 
         final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
         contentRepo.setBackupRepository(mockRepo);
-        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false);
         final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
         Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index ee26d1f..ca79fab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -86,10 +86,10 @@ public class TestWriteAheadFlowFileRepository {
         when(connection.getFlowFileQueue()).thenReturn(queue);
         queueProvider.addConnection(connection);
 
-        final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false);
+        final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
         final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
 
-        final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false);
+        final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
         final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
 
         // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index d29105a..867810e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager {
             }
         };
 
-        final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false);
+        final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false, false);
         assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1.
 
         assertEquals(1, manager.getClaimantCount(resourceClaim));