You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 21:59:57 UTC

[1/3] nifi git commit: NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed

Repository: nifi
Updated Branches:
  refs/heads/NIFI-744 [created] 50379fcc0


http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index f2df821..5ee5fb5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -43,11 +43,12 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wali.MinimalLockingWriteAheadLog;
@@ -86,7 +87,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
     private WriteAheadRecordSerde serde;
-    private ContentClaimManager claimManager;
+    private ResourceClaimManager claimManager;
 
     // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
     // We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been
@@ -125,7 +126,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) throws IOException {
+    public void initialize(final ResourceClaimManager claimManager) throws IOException {
         this.claimManager = claimManager;
 
         Files.createDirectories(flowFileRepositoryPath);
@@ -168,6 +169,32 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         updateRepository(records, alwaysSync);
     }
 
+    private void markDestructable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        if (resourceClaim == null) {
+            return;
+        }
+
+        claimManager.markDestructable(resourceClaim);
+    }
+
+    private int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        if (resourceClaim == null) {
+            return 0;
+        }
+
+        return claimManager.getClaimantCount(resourceClaim);
+    }
+
     private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING && record.getDestination() == null) {
@@ -190,17 +217,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
+                if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
                     claimsToAdd.add(record.getCurrentClaim());
                 }
 
                 // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
                     claimsToAdd.add(record.getOriginalClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, mark original as destructable
-                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
                     claimsToAdd.add(record.getOriginalClaim());
                 }
             }
@@ -212,7 +239,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey);
             if (claimQueue == null) {
                 claimQueue = new LinkedBlockingQueue<>();
-                BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+                final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
                 if (existingClaimQueue != null) {
                     claimQueue = existingClaimQueue;
                 }
@@ -222,9 +249,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
     }
 
-    private void markDestructable(final ContentClaim claim) {
-        claimManager.markDestructable(claim);
-    }
 
     @Override
     public void onSync(final int partitionIndex) {
@@ -307,7 +331,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         for (final RepositoryRecord record : recordList) {
             final ContentClaim claim = record.getCurrentClaim();
             if (claim != null) {
-                claimManager.incrementClaimantCount(claim);
+                claimManager.incrementClaimantCount(claim.getResourceClaim());
             }
         }
 
@@ -339,7 +363,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                     final long start = System.nanoTime();
                     final int numRecordsCheckpointed = checkpoint();
                     final long end = System.nanoTime();
-                    final long millis = TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS);
+                    final long millis = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS);
                     logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds",
                             new Object[]{numRecordsCheckpointed, millis});
                 } catch (final IOException e) {
@@ -378,9 +402,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
         private Map<String, FlowFileQueue> flowFileQueueMap = null;
         private long recordsRestored = 0L;
-        private final ContentClaimManager claimManager;
+        private final ResourceClaimManager claimManager;
 
-        public WriteAheadRecordSerde(final ContentClaimManager claimManager) {
+        public WriteAheadRecordSerde(final ResourceClaimManager claimManager) {
             this.claimManager = claimManager;
         }
 
@@ -518,7 +542,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             }
 
             final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-            RepositoryRecord record = currentRecordStates.get(recordId);
+            final RepositoryRecord record = currentRecordStates.get(recordId);
             ffBuilder.id(recordId);
             if (record != null) {
                 ffBuilder.fromFlowFile(record.getCurrent());
@@ -705,11 +729,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                 out.write(0);
             } else {
                 out.write(1);
-                writeString(claim.getId(), out);
-                writeString(claim.getContainer(), out);
-                writeString(claim.getSection(), out);
+
+                final ResourceClaim resourceClaim = claim.getResourceClaim();
+                writeString(resourceClaim.getId(), out);
+                writeString(resourceClaim.getContainer(), out);
+                writeString(resourceClaim.getSection(), out);
+                out.writeLong(claim.getOffset());
+                out.writeLong(claim.getLength());
+
                 out.writeLong(offset);
-                out.writeBoolean(claim.isLossTolerant());
+                out.writeBoolean(resourceClaim.isLossTolerant());
             }
         }
 
@@ -726,6 +755,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
                 final String container = readString(in);
                 final String section = readString(in);
+
+                final long resourceOffset;
+                final long resourceLength;
+                if (serializationVersion < 7) {
+                    resourceOffset = 0L;
+                    resourceLength = -1L;
+                } else {
+                    resourceOffset = in.readLong();
+                    resourceLength = in.readLong();
+                }
+
                 final long claimOffset = in.readLong();
 
                 final boolean lossTolerant;
@@ -735,8 +775,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                     lossTolerant = false;
                 }
 
-                final ContentClaim existingClaim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
-                ffBuilder.contentClaim(existingClaim);
+                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+                contentClaim.setLength(resourceLength);
+
+                ffBuilder.contentClaim(contentClaim);
                 ffBuilder.contentClaimOffset(claimOffset);
             } else if (claimExists == -1) {
                 throw new EOFException();
@@ -785,16 +828,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                 throw new EOFException();
             }
             if (firstValue == 0xff && secondValue == 0xff) {
-                int ch1 = in.read();
-                int ch2 = in.read();
-                int ch3 = in.read();
-                int ch4 = in.read();
+                final int ch1 = in.read();
+                final int ch2 = in.read();
+                final int ch3 = in.read();
+                final int ch4 = in.read();
                 if ((ch1 | ch2 | ch3 | ch4) < 0) {
                     throw new EOFException();
                 }
-                return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+                return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
             } else {
-                return ((firstValue << 8) + (secondValue));
+                return (firstValue << 8) + secondValue;
             }
         }
 
@@ -834,7 +877,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
         @Override
         public int getVersion() {
-            return 6;
+            return 7;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index a8a6963..753e818 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.controller.repository.claim;
 
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * <p>
@@ -28,115 +27,89 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
 
-    private final String id;
-    private final String container;
-    private final String section;
-    private final boolean lossTolerant;
-    private final AtomicInteger claimantCount = new AtomicInteger(0);
+    private final ResourceClaim resourceClaim;
     private final int hashCode;
-
-    StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
-        this.container = container.intern();
-        this.section = section.intern();
-        this.id = id;
-        this.lossTolerant = lossTolerant;
-
-        hashCode = (int) (17 + 19 * (id.hashCode()) + 19 * container.hashCode() + 19 * section.hashCode());
+    private volatile long offset;
+    private volatile long length;
+
+    public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
+        this.resourceClaim = resourceClaim;
+        this.offset = offset;
+        this.length = -1L;
+        this.hashCode = calculateHashCode();
     }
 
-    @Override
-    public boolean isLossTolerant() {
-        return lossTolerant;
+    public void setLength(final long length) {
+        this.length = length;
     }
 
-    /**
-     * @return the unique identifier for this claim
-     */
-    @Override
-    public String getId() {
-        return id;
+    public void setOffset(final long offset) {
+        this.offset = offset;
     }
 
-    /**
-     * @return the container identifier in which this claim is held
-     */
-    @Override
-    public String getContainer() {
-        return container;
+    private int calculateHashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + hashCode;
+        result = prime * result + (int) (length ^ length >>> 32);
+        result = prime * result + (int) (offset ^ offset >>> 32);
+        result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
+        return result;
     }
 
-    /**
-     * @return the section within a given container the claim is held
-     */
     @Override
-    public String getSection() {
-        return section;
-    }
-
-    int getClaimantCount() {
-        return claimantCount.get();
-    }
-
-    int decrementClaimantCount() {
-        return claimantCount.decrementAndGet();
-    }
-
-    int incrementClaimantCount() {
-        return claimantCount.incrementAndGet();
+    public int hashCode() {
+        return this.hashCode;
     }
 
-    /**
-     * Provides the natural ordering for ContentClaim objects. By default they are sorted by their id, then container, then section
-     *
-     * @param other other claim
-     * @return x such that x <=1 if this is less than other;
-     * x=0 if this.equals(other);
-     * x >= 1 if this is greater than other
-     */
     @Override
-    public int compareTo(final ContentClaim other) {
-        final int idComparison = id.compareTo(other.getId());
-        if (idComparison != 0) {
-            return idComparison;
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
         }
 
-        final int containerComparison = container.compareTo(other.getContainer());
-        if (containerComparison != 0) {
-            return containerComparison;
+        if (obj == null) {
+            return false;
         }
 
-        return section.compareTo(other.getSection());
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        if (this == other) {
-            return true;
+        if (!(obj instanceof ContentClaim)) {
+            return false;
         }
 
-        if (other == null) {
+        final ContentClaim other = (ContentClaim) obj;
+        if (length != other.getLength()) {
             return false;
         }
-        if (hashCode != other.hashCode()) {
-            // We check hash code before instanceof because instanceof is fairly expensive and for
-            // StandardContentClaim, calling hashCode() simply returns a pre-calculated value.
+
+        if (offset != other.getOffset()) {
             return false;
         }
 
-        if (!(other instanceof ContentClaim)) {
-            return false;
+        return resourceClaim.equals(other.getResourceClaim());
+    }
+
+    @Override
+    public int compareTo(final ContentClaim o) {
+        final int resourceComp = resourceClaim.compareTo(o.getResourceClaim());
+        if (resourceComp != 0) {
+            return resourceComp;
         }
-        final ContentClaim otherClaim = (ContentClaim) other;
-        return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection());
+
+        return Long.compare(offset, o.getOffset());
     }
 
     @Override
-    public int hashCode() {
-        return hashCode;
+    public ResourceClaim getResourceClaim() {
+        return resourceClaim;
+    }
+
+    @Override
+    public long getOffset() {
+        return offset;
     }
 
     @Override
-    public String toString() {
-        return "ContentClaim[id=" + id + ", container=" + container + ", section=" + section + "]";
+    public long getLength() {
+        return length;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
deleted file mode 100644
index b68f95e..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StandardContentClaimManager implements ContentClaimManager {
-
-    private static final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
-    private static final Logger logger = LoggerFactory.getLogger(StandardContentClaimManager.class);
-
-    private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
-
-    @Override
-    public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
-        return new StandardContentClaim(container, section, id, lossTolerant);
-    }
-
-    private static AtomicInteger getCounter(final ContentClaim claim) {
-        if (claim == null) {
-            return null;
-        }
-
-        AtomicInteger counter = claimantCounts.get(claim);
-        if (counter != null) {
-            return counter;
-        }
-
-        counter = new AtomicInteger(0);
-        AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
-        return (existingCounter == null) ? counter : existingCounter;
-    }
-
-    @Override
-    public int getClaimantCount(final ContentClaim claim) {
-        if (claim == null) {
-            return 0;
-        }
-        final AtomicInteger counter = claimantCounts.get(claim);
-        return (counter == null) ? 0 : counter.get();
-    }
-
-    @Override
-    public int decrementClaimantCount(final ContentClaim claim) {
-        if (claim == null) {
-            return 0;
-        }
-
-        final AtomicInteger counter = claimantCounts.get(claim);
-        if (counter == null) {
-            logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
-            return -1;
-        }
-
-        final int newClaimantCount = counter.decrementAndGet();
-        logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
-        if (newClaimantCount == 0) {
-            claimantCounts.remove(claim);
-        }
-        return newClaimantCount;
-    }
-
-    @Override
-    public int incrementClaimantCount(final ContentClaim claim) {
-        return incrementClaimantCount(claim, false);
-    }
-
-    @Override
-    public int incrementClaimantCount(final ContentClaim claim, final boolean newClaim) {
-        final AtomicInteger counter = getCounter(claim);
-
-        final int newClaimantCount = counter.incrementAndGet();
-        logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount);
-        // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims.
-        if (!newClaim && newClaimantCount == 1) {
-            destructableClaims.remove(claim);
-        }
-        return newClaimantCount;
-    }
-
-    @Override
-    public void markDestructable(final ContentClaim claim) {
-        if (claim == null) {
-            return;
-        }
-
-        if (getClaimantCount(claim) > 0) {
-            return;
-        }
-
-        logger.debug("Marking claim {} as destructable", claim);
-        try {
-            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
-            }
-        } catch (final InterruptedException ie) {
-        }
-    }
-
-    @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements) {
-        final int drainedCount = destructableClaims.drainTo(destination, maxElements);
-        logger.debug("Drained {} destructable claims to {}", drainedCount, destination);
-    }
-
-    @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) {
-        try {
-            final ContentClaim firstClaim = destructableClaims.poll(timeout, unit);
-            if (firstClaim != null) {
-                destination.add(firstClaim);
-                destructableClaims.drainTo(destination, maxElements - 1);
-            }
-        } catch (final InterruptedException e) {
-        }
-    }
-
-    @Override
-    public void purge() {
-        claimantCounts.clear();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
new file mode 100644
index 0000000..bd3ed5a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> {
+    private final String id;
+    private final String container;
+    private final String section;
+    private final boolean lossTolerant;
+    private final AtomicInteger claimantCount = new AtomicInteger(0);
+    private final int hashCode;
+
+    public StandardResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
+        this.container = container.intern();
+        this.section = section.intern();
+        this.id = id;
+        this.lossTolerant = lossTolerant;
+
+        hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * section.hashCode();
+    }
+
+    @Override
+    public boolean isLossTolerant() {
+        return lossTolerant;
+    }
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    @Override
+    public String getContainer() {
+        return container;
+    }
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    @Override
+    public String getSection() {
+        return section;
+    }
+
+    int getClaimantCount() {
+        return claimantCount.get();
+    }
+
+    int decrementClaimantCount() {
+        return claimantCount.decrementAndGet();
+    }
+
+    int incrementClaimantCount() {
+        return claimantCount.incrementAndGet();
+    }
+
+    /**
+     * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
+     *
+     * @param other other claim
+     * @return x such that x <=1 if this is less than other;
+     *         x=0 if this.equals(other);
+     *         x >= 1 if this is greater than other
+     */
+    @Override
+    public int compareTo(final ResourceClaim other) {
+        final int idComparison = id.compareTo(other.getId());
+        if (idComparison != 0) {
+            return idComparison;
+        }
+
+        final int containerComparison = container.compareTo(other.getContainer());
+        if (containerComparison != 0) {
+            return containerComparison;
+        }
+
+        return section.compareTo(other.getSection());
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null) {
+            return false;
+        }
+        if (hashCode != other.hashCode()) {
+            // We check hash code before instanceof because instanceof is fairly expensive and for
+            // StandardResourceClaim, calling hashCode() simply returns a pre-calculated value.
+            return false;
+        }
+
+        if (!(other instanceof ResourceClaim)) {
+            return false;
+        }
+        final ResourceClaim otherClaim = (ResourceClaim) other;
+        return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection());
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
new file mode 100644
index 0000000..4826ac3
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardResourceClaimManager implements ResourceClaimManager {
+
+    private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+    private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
+
+    private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
+
+    @Override
+    public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
+        return new StandardResourceClaim(container, section, id, lossTolerant);
+    }
+
+    private static AtomicInteger getCounter(final ResourceClaim claim) {
+        if (claim == null) {
+            return null;
+        }
+
+        AtomicInteger counter = claimantCounts.get(claim);
+        if (counter != null) {
+            return counter;
+        }
+
+        counter = new AtomicInteger(0);
+        final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
+        return existingCounter == null ? counter : existingCounter;
+    }
+
+    @Override
+    public int getClaimantCount(final ResourceClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+        final AtomicInteger counter = claimantCounts.get(claim);
+        return counter == null ? 0 : counter.get();
+    }
+
+    @Override
+    public int decrementClaimantCount(final ResourceClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final AtomicInteger counter = claimantCounts.get(claim);
+        if (counter == null) {
+            logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
+            return -1;
+        }
+
+        final int newClaimantCount = counter.decrementAndGet();
+        logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
+        if (newClaimantCount == 0) {
+            claimantCounts.remove(claim);
+        }
+        return newClaimantCount;
+    }
+
+    @Override
+    public int incrementClaimantCount(final ResourceClaim claim) {
+        return incrementClaimantCount(claim, false);
+    }
+
+    @Override
+    public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) {
+        final AtomicInteger counter = getCounter(claim);
+
+        final int newClaimantCount = counter.incrementAndGet();
+        logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount);
+        // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims.
+        if (!newClaim && newClaimantCount == 1) {
+            destructableClaims.remove(claim);
+        }
+        return newClaimantCount;
+    }
+
+    @Override
+    public void markDestructable(final ResourceClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        if (getClaimantCount(claim) > 0) {
+            return;
+        }
+
+        logger.debug("Marking claim {} as destructable", claim);
+        try {
+            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
+            }
+        } catch (final InterruptedException ie) {
+        }
+    }
+
+    @Override
+    public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) {
+        final int drainedCount = destructableClaims.drainTo(destination, maxElements);
+        logger.debug("Drained {} destructable claims to {}", drainedCount, destination);
+    }
+
+    @Override
+    public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) {
+        try {
+            final ResourceClaim firstClaim = destructableClaims.poll(timeout, unit);
+            if (firstClaim != null) {
+                destination.add(firstClaim);
+                destructableClaims.drainTo(destination, maxElements - 1);
+            }
+        } catch (final InterruptedException e) {
+        }
+    }
+
+    @Override
+    public void purge() {
+        claimantCounts.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 364dcad..a17bd40 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -30,8 +30,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -48,7 +48,7 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
             Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopContentClaimManager());
+            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {
@@ -58,43 +58,43 @@ public class TestFileSystemSwapManager {
         }
     }
 
-    public class NopContentClaimManager implements ContentClaimManager {
+    public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
-        public ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant) {
+        public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) {
             return null;
         }
 
         @Override
-        public int getClaimantCount(ContentClaim claim) {
+        public int getClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int decrementClaimantCount(ContentClaim claim) {
+        public int decrementClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int incrementClaimantCount(ContentClaim claim) {
+        public int incrementClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int incrementClaimantCount(ContentClaim claim, boolean newClaim) {
+        public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) {
             return 0;
         }
 
         @Override
-        public void markDestructable(ContentClaim claim) {
+        public void markDestructable(ResourceClaim claim) {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements) {
+        public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit) {
+        public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) {
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index ada0775..519ba9c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -16,11 +16,10 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.FileSystemRepository;
-import org.apache.nifi.controller.repository.ContentNotFoundException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -37,12 +36,15 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
-
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,20 +55,25 @@ public class TestFileSystemRepository {
     public static final File helloWorldFile = new File("src/test/resources/hello.txt");
 
     private FileSystemRepository repository = null;
+    private final File rootFile = new File("target/content_repository");
 
     @Before
     public void setup() throws IOException {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
-        final File repo = new File("target/content_repository");
-        if (repo.exists()) {
-            DiskUtils.deleteRecursively(repo);
+        if (rootFile.exists()) {
+            DiskUtils.deleteRecursively(rootFile);
         }
 
         repository = new FileSystemRepository();
-        repository.initialize(new StandardContentClaimManager());
+        repository.initialize(new StandardResourceClaimManager());
         repository.purge();
     }
 
+    @After
+    public void shutdown() throws IOException {
+        repository.shutdown();
+    }
+
     @Test
     public void testCreateContentClaim() throws IOException {
         // value passed to #create is irrelevant because the FileSystemRepository does not currently support loss tolerance.
@@ -98,6 +105,127 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testResourceClaimReused() throws IOException {
+        final ContentClaim claim1 = repository.create(false);
+        final ContentClaim claim2 = repository.create(false);
+
+        // should not be equal because claim1 may still be in use
+        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+
+        try (final OutputStream out = repository.write(claim1)) {
+        }
+
+        final ContentClaim claim3 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+    }
+
+    @Test
+    public void testResourceClaimNotReusedAfterRestart() throws IOException, InterruptedException {
+        final ContentClaim claim1 = repository.create(false);
+        try (final OutputStream out = repository.write(claim1)) {
+        }
+
+        repository.shutdown();
+        Thread.sleep(1000L);
+
+        repository = new FileSystemRepository();
+        repository.initialize(new StandardResourceClaimManager());
+        repository.purge();
+
+        final ContentClaim claim2 = repository.create(false);
+        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+    }
+
+    @Test
+    public void testRewriteContentClaim() throws IOException {
+        final ContentClaim claim1 = repository.create(false);
+        assertEquals(1, repository.getClaimantCount(claim1));
+
+        try (final OutputStream out = repository.write(claim1)) {
+            out.write("abc".getBytes());
+        }
+        assertEquals(1, repository.getClaimantCount(claim1));
+
+        try (final OutputStream out = repository.write(claim1)) {
+            out.write("cba".getBytes());
+        }
+        assertEquals(1, repository.getClaimantCount(claim1));
+
+        try (final InputStream in = repository.read(claim1)) {
+            assertEquals('c', in.read());
+            assertEquals('b', in.read());
+            assertEquals('a', in.read());
+        }
+        assertEquals(1, repository.getClaimantCount(claim1));
+
+        assertEquals(3, repository.size(claim1));
+
+        final byte[] oneMB = new byte[1024 * 1024 - 6];
+        new Random().nextBytes(oneMB);
+        try (final OutputStream out = repository.write(claim1)) {
+            out.write(oneMB);
+        }
+        assertEquals(1, repository.getClaimantCount(claim1));
+
+        assertEquals(1024 * 1024 - 6, repository.size(claim1));
+        try (final InputStream in = repository.read(claim1)) {
+            final byte[] buff = new byte[oneMB.length];
+            StreamUtils.fillBuffer(in, buff);
+            assertTrue(Arrays.equals(buff, oneMB));
+        }
+
+        final ResourceClaim resourceClaim = claim1.getResourceClaim();
+        final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
+        assertTrue(Files.exists(path));
+        assertEquals(0, repository.decrementClaimantCount(claim1));
+        assertTrue(repository.remove(claim1));
+        assertFalse(Files.exists(path));
+
+        final ContentClaim claim2 = repository.create(false);
+        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+    }
+
+    @Test
+    public void testWriteWithNoContent() throws IOException {
+        final ContentClaim claim1 = repository.create(false);
+        try (final OutputStream out = repository.write(claim1)) {
+            out.write("Hello".getBytes());
+        }
+
+        final ContentClaim claim2 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim());
+        try (final OutputStream out = repository.write(claim2)) {
+
+        }
+
+        final ContentClaim claim3 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+        try (final OutputStream out = repository.write(claim3)) {
+            out.write(" World".getBytes());
+        }
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final InputStream in = repository.read(claim1)) {
+            StreamUtils.copy(in, baos);
+        }
+
+        assertEquals("Hello", baos.toString());
+
+        baos.reset();
+        try (final InputStream in = repository.read(claim2)) {
+            StreamUtils.copy(in, baos);
+        }
+        assertEquals("", baos.toString());
+        assertEquals(0, baos.size());
+
+        baos.reset();
+        try (final InputStream in = repository.read(claim3)) {
+            StreamUtils.copy(in, baos);
+        }
+        assertEquals(" World", baos.toString());
+    }
+
+    @Test
     public void testRemoveDeletesFileIfNoClaimants() throws IOException {
         final ContentClaim claim = repository.create(true);
         assertNotNull(claim);
@@ -155,6 +283,40 @@ public class TestFileSystemRepository {
         final byte[] data = Files.readAllBytes(path);
         final byte[] expected = Files.readAllBytes(testFile.toPath());
         assertTrue(Arrays.equals(expected, data));
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final InputStream in = repository.read(claim)) {
+            StreamUtils.copy(in, baos);
+        }
+
+        assertTrue(Arrays.equals(expected, baos.toByteArray()));
+    }
+
+    @Test
+    public void testImportFromFileWithAppend() throws IOException {
+        final ContentClaim claim = repository.create(false);
+        final File hello = new File("src/test/resources/hello.txt");
+        final File goodbye = new File("src/test/resources/bye.txt");
+
+        repository.importFrom(hello.toPath(), claim, true);
+        assertContentEquals(claim, "Hello, World");
+
+        repository.importFrom(goodbye.toPath(), claim, true);
+        assertContentEquals(claim, "Hello, WorldGood-Bye, World!");
+
+        repository.importFrom(hello.toPath(), claim, true);
+        assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World");
+
+        repository.importFrom(goodbye.toPath(), claim, false);
+        assertContentEquals(claim, "Good-Bye, World!");
+    }
+
+    private void assertContentEquals(final ContentClaim claim, final String expected) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final InputStream in = repository.read(claim)) {
+            StreamUtils.copy(in, baos);
+        }
+        assertEquals(expected, new String(baos.toByteArray()));
     }
 
     @Test
@@ -314,9 +476,9 @@ public class TestFileSystemRepository {
         }
 
         final ContentClaim destination = repository.create(true);
-        final byte[] headerBytes = (header == null) ? null : header.getBytes();
-        final byte[] footerBytes = (footer == null) ? null : footer.getBytes();
-        final byte[] demarcatorBytes = (demarcator == null) ? null : demarcator.getBytes();
+        final byte[] headerBytes = header == null ? null : header.getBytes();
+        final byte[] footerBytes = footer == null ? null : footer.getBytes();
+        final byte[] demarcatorBytes = demarcator == null ? null : demarcator.getBytes();
         repository.merge(claims, destination, headerBytes, footerBytes, demarcatorBytes);
 
         final StringBuilder sb = new StringBuilder();
@@ -334,8 +496,12 @@ public class TestFileSystemRepository {
         }
         final String expectedText = sb.toString();
         final byte[] expected = expectedText.getBytes();
-        final Path path = getPath(destination);
-        final byte[] actual = Files.readAllBytes(path);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) destination.getLength());
+        try (final InputStream in = repository.read(destination)) {
+            StreamUtils.copy(in, baos);
+        }
+        final byte[] actual = baos.toByteArray();
         assertTrue(Arrays.equals(expected, actual));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 3486875..b1fd4c7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -51,8 +51,11 @@ import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.Relationship;
@@ -179,7 +182,7 @@ public class TestStandardProcessSession {
         when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
 
         contentRepo = new MockContentRepository();
-        contentRepo.initialize(new StandardContentClaimManager());
+        contentRepo.initialize(new StandardResourceClaimManager());
         flowFileRepo = new MockFlowFileRepository();
 
         final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
@@ -194,10 +197,10 @@ public class TestStandardProcessSession {
         assertEquals(1, contentRepo.getExistingClaims().size());
 
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile flowFile = session.get();
@@ -295,10 +298,10 @@ public class TestStandardProcessSession {
     public void testAppendAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -316,12 +319,12 @@ public class TestStandardProcessSession {
     public void testReadAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
-        FlowFile flowFile = session.get();
+        final FlowFile flowFile = session.get();
         assertNotNull(flowFile);
         final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
         session.read(flowFile, new InputStreamCallback() {
@@ -337,10 +340,10 @@ public class TestStandardProcessSession {
     public void testStreamAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -361,10 +364,10 @@ public class TestStandardProcessSession {
     public void testWriteAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -382,9 +385,9 @@ public class TestStandardProcessSession {
     public void testCreateThenRollbackRemovesContent() throws IOException {
 
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         final StreamCallback nop = new StreamCallback() {
@@ -402,7 +405,7 @@ public class TestStandardProcessSession {
 
         session.write(flowFile2, nop);
 
-        FlowFile flowFile3 = session.create();
+        final FlowFile flowFile3 = session.create();
         session.write(flowFile3, nop);
 
         session.rollback();
@@ -412,14 +415,14 @@ public class TestStandardProcessSession {
     @Test
     public void testForksNotEmittedIfFilesDeleted() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.commit();
 
@@ -429,14 +432,14 @@ public class TestStandardProcessSession {
     @Test
     public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
 
@@ -446,15 +449,15 @@ public class TestStandardProcessSession {
     @Test
     public void testProvenanceEventsEmittedForRemove() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
-        FlowFile secondNewFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
+        final FlowFile secondNewFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.transfer(secondNewFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
@@ -465,16 +468,16 @@ public class TestStandardProcessSession {
     @Test
     public void testUpdateAttributesThenJoin() throws IOException {
         final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-                .id(2L)
-                .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .id(2L)
+            .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord1);
         flowFileQueue.put(flowFileRecord2);
@@ -538,17 +541,17 @@ public class TestStandardProcessSession {
     @Test
     public void testForkOneToOneReported() throws IOException {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
         // we have to increment the ID generator because we are creating a FlowFile without the FlowFile Repository's knowledge
         flowFileRepo.idGenerator.getAndIncrement();
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.getProvenanceReporter().fork(newFlowFile, Collections.singleton(orig));
         session.remove(orig);
@@ -566,7 +569,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -610,7 +613,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -655,41 +658,16 @@ public class TestStandardProcessSession {
     @Test
     public void testMissingFlowFileExceptionThrownWhenUnableToReadData() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "x";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "x";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .size(1L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+            .size(1L)
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.read(ff1, new InputStreamCallback() {
                 @Override
@@ -704,41 +682,16 @@ public class TestStandardProcessSession {
     @Test
     public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "x";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "x";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .size(1L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+            .size(1L)
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.write(ff1, new StreamCallback() {
                 @Override
@@ -753,35 +706,10 @@ public class TestStandardProcessSession {
     @Test
     public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -794,43 +722,18 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .contentClaimOffset(1000L)
-                .size(1000L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaimOffset(1000L)
+            .size(1000L)
+            .build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.write(ff2, new StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws IOException {
@@ -844,34 +747,11 @@ public class TestStandardProcessSession {
     @Test
     public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+            .build();
 
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                }).build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -884,41 +764,17 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
 
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .contentClaimOffset(1000L).size(1L).build();
+            .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.read(ff2, new InputStreamCallback() {
                 @Override
                 public void process(InputStream in) throws IOException {
@@ -931,7 +787,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -975,7 +831,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testCreateEmitted() throws IOException {
-        FlowFile newFlowFile = session.create();
+        final FlowFile newFlowFile = session.create();
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
 
@@ -1009,9 +865,9 @@ public class TestStandardProcessSession {
     @Test
     public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+            .build();
         this.flowFileQueue.put(flowFile);
 
         FlowFile existingFlowFile = session.get();
@@ -1035,9 +891,9 @@ public class TestStandardProcessSession {
     @Test
     public void testAttributesModifiedEmitted() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+            .build();
         this.flowFileQueue.put(flowFile);
 
         FlowFile existingFlowFile = session.get();
@@ -1104,7 +960,7 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public void initialize(ContentClaimManager claimManager) throws IOException {
+        public void initialize(ResourceClaimManager claimManager) throws IOException {
         }
     }
 
@@ -1112,9 +968,9 @@ public class TestStandardProcessSession {
 
         private final AtomicLong idGenerator = new AtomicLong(0L);
         private final AtomicLong claimsRemoved = new AtomicLong(0L);
-        private ContentClaimManager claimManager;
+        private ResourceClaimManager claimManager;
 
-        private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+        private final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
 
         @Override
         public void shutdown() {
@@ -1124,9 +980,10 @@ public class TestStandardProcessSession {
             final Set<ContentClaim> claims = new HashSet<>();
 
             for (long i = 0; i < idGenerator.get(); i++) {
-                final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(i), false);
-                if (getClaimantCount(claim) > 0) {
-                    claims.add(claim);
+                final ResourceClaim resourceClaim = new StandardResourceClaim("container", "section", String.valueOf(i), false);
+                final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+                if (getClaimantCount(contentClaim) > 0) {
+                    claims.add(contentClaim);
                 }
             }
 
@@ -1135,15 +992,17 @@ public class TestStandardProcessSession {
 
         @Override
         public ContentClaim create(boolean lossTolerant) throws IOException {
-            final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
-            claimantCounts.put(claim, new AtomicInteger(1));
-            final Path path = getPath(claim);
+            final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
+            final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+
+            claimantCounts.put(contentClaim, new AtomicInteger(1));
+            final Path path = getPath(contentClaim);
             final Path parent = path.getParent();
             if (Files.exists(parent) == false) {
                 Files.createDirectories(parent);
             }
-            Files.createFile(getPath(claim));
-            return claim;
+            Files.createFile(getPath(contentClaim));
+            return contentClaim;
         }
 
         @Override
@@ -1219,7 +1078,8 @@ public class TestStandardProcessSession {
             return 0;
         }
 
-        private Path getPath(final ContentClaim claim) {
+        private Path getPath(final ContentClaim contentClaim) {
+            final ResourceClaim claim = contentClaim.getResourceClaim();
             return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
         }
 
@@ -1315,7 +1175,7 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public void initialize(ContentClaimManager claimManager) throws IOException {
+        public void initialize(ResourceClaimManager claimManager) throws IOException {
             this.claimManager = claimManager;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index a32f321..5733164 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.VolatileContentRepository;
+
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
@@ -29,10 +30,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,11 +43,11 @@ import org.mockito.Mockito;
 
 public class TestVolatileContentRepository {
 
-    private ContentClaimManager claimManager;
+    private ResourceClaimManager claimManager;
 
     @Before
     public void setup() {
-        claimManager = new StandardContentClaimManager();
+        claimManager = new StandardResourceClaimManager();
     }
 
     @Test
@@ -81,8 +83,9 @@ public class TestVolatileContentRepository {
 
         final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
         contentRepo.setBackupRepository(mockRepo);
-        final ContentClaim newClaim = claimManager.newContentClaim("container", "section", "1000", true);
-        Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(newClaim);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true);
+        final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+        Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
 
         final ByteArrayOutputStream overflowStream = new ByteArrayOutputStream();
         Mockito.when(mockRepo.write(Matchers.any(ContentClaim.class))).thenReturn(overflowStream);

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 054ef5e..2138928 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -34,7 +34,7 @@ import java.util.List;
 
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.file.FileUtils;
 
 import org.junit.Test;
@@ -53,7 +53,7 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository();
-        repo.initialize(new StandardContentClaimManager());
+        repo.initialize(new StandardResourceClaimManager());
 
         final List<Connection> connectionList = new ArrayList<>();
         final QueueProvider queueProvider = new QueueProvider() {
@@ -119,7 +119,7 @@ public class TestWriteAheadFlowFileRepository {
 
         // restore
         final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository();
-        repo2.initialize(new StandardContentClaimManager());
+        repo2.initialize(new StandardResourceClaimManager());
         repo2.loadFlowFiles(queueProvider, 0L);
 
         assertEquals(1, flowFileCollection.size());

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
new file mode 100644
index 0000000..64926ce
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
@@ -0,0 +1 @@
+Good-Bye, World!
\ No newline at end of file


[3/3] nifi git commit: NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed

Posted by ma...@apache.org.
NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/50379fcc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/50379fcc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/50379fcc

Branch: refs/heads/NIFI-744
Commit: 50379fcc06347f413f14615f710cda6d02a6c8ce
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 15:59:25 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 29 15:59:25 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           |   4 +-
 .../repository/FlowFileRepository.java          |   4 +-
 .../repository/FlowFileSwapManager.java         |   6 +-
 .../repository/claim/ContentClaim.java          |  35 +-
 .../repository/claim/ContentClaimManager.java   | 142 -----
 .../repository/claim/ResourceClaim.java         |  54 ++
 .../repository/claim/ResourceClaimManager.java  | 135 +++++
 .../stream/io/ByteCountingOutputStream.java     |   4 +
 .../nifi/controller/FileSystemSwapManager.java  |  63 ++-
 .../apache/nifi/controller/FlowController.java  | 167 +++---
 .../repository/FileSystemRepository.java        | 566 ++++++++++++++-----
 .../repository/StandardFlowFileRecord.java      |   6 +-
 .../repository/StandardProcessSession.java      | 318 +++--------
 .../repository/VolatileContentRepository.java   |  49 +-
 .../repository/VolatileFlowFileRepository.java  |  46 +-
 .../WriteAheadFlowFileRepository.java           | 101 +++-
 .../repository/claim/StandardContentClaim.java  | 133 ++---
 .../claim/StandardContentClaimManager.java      | 145 -----
 .../repository/claim/StandardResourceClaim.java | 134 +++++
 .../claim/StandardResourceClaimManager.java     | 145 +++++
 .../controller/TestFileSystemSwapManager.java   |  24 +-
 .../repository/TestFileSystemRepository.java    | 192 ++++++-
 .../repository/TestStandardProcessSession.java  | 378 ++++---------
 .../TestVolatileContentRepository.java          |  17 +-
 .../TestWriteAheadFlowFileRepository.java       |   6 +-
 .../src/test/resources/bye.txt                  |   1 +
 26 files changed, 1646 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index ee3ead9..da87d75 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Defines the capabilities of a content repository. Append options are not
@@ -42,7 +42,7 @@ public interface ContentRepository {
      * @param claimManager to handle claims
      * @throws java.io.IOException if unable to init
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * Shuts down the Content Repository, freeing any resources that may be

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 5e59e04..58fc6b3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Implementations must be thread safe
@@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable {
      * @param claimManager for handling claims
      * @throws java.io.IOException if unable to initialize repository
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * @return the maximum number of bytes that can be stored in the underlying

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 869e2b3..2e5be11 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 
 /**
@@ -38,7 +38,7 @@ public interface FlowFileSwapManager {
      * @param reporter the EventReporter that can be used for notifying users of
      * important events
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
+    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
 
     /**
      * Shuts down the manager
@@ -59,5 +59,5 @@ public interface FlowFileSwapManager {
      * @param claimManager manager
      * @return how many flowfiles have been recovered
      */
-    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager);
+    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 53cc44f..6a2ef7f 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim;
 
 /**
  * <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow
- * files may reference the same content by both having the same content
- * claim.</p>
+ * A reference to a section of a {@link ResourceClaim}, which may or may not encompass 
+ * the entire ResourceClaim. Multiple FlowFiles may reference the same content by both 
+ * having the same content claim.
+ * </p>
  *
  * <p>
- * Must be thread safe</p>
- *
+ * Must be thread safe
+ * </p>
  */
 public interface ContentClaim extends Comparable<ContentClaim> {
 
     /**
-     * @return the unique identifier for this claim
-     */
-    String getId();
-
-    /**
-     * @return the container identifier in which this claim is held
+     * @return the ResourceClaim that this ContentClaim references
      */
-    String getContainer();
-
+    ResourceClaim getResourceClaim();
+    
     /**
-     * @return the section within a given container the claim is held
+     * @return the offset into the ResourceClaim where the content for this
+     * claim begins
      */
-    String getSection();
-
+    long getOffset();
+    
     /**
-     * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
-     * attempt to keep the content but will not sacrifice a great deal of
-     * performance to do so
+     * @return the length of this ContentClaim
      */
-    boolean isLossTolerant();
+    long getLength();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
deleted file mode 100644
index bffcec3..0000000
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Responsible for managing all ContentClaims that are used in the application
- */
-public interface ContentClaimManager {
-
-    /**
-     * Creates a new Content Claim with the given id, container, section, and
-     * loss tolerance.
-     *
-     * @param id of claim
-     * @param container of claim
-     * @param section of claim
-     * @param lossTolerant of claim
-     * @return new claim
-     */
-    ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant);
-
-    /**
-     * @param claim to obtain reference count for
-     * @return the number of FlowFiles that hold a claim to a particular piece
-     * of FlowFile content
-     */
-    int getClaimantCount(ContentClaim claim);
-
-    /**
-     * Decreases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to decrement claimants on
-     * @return new claimaint count
-     */
-    int decrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to increment claims on
-     * @return new claimant count
-     */
-    int incrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count.
-     *
-     * If it is known that the Content Claim whose count is being incremented is
-     * a newly created ContentClaim, this method should be called with a value
-     * of {@code true} as the second argument, as it may allow the manager to
-     * optimize its tasks, knowing that the Content Claim cannot be referenced
-     * by any other component
-     *
-     * @param claim to increment
-     * @param newClaim provides a hint that no other process can have access to this
-     * claim right now
-     * @return new claim count
-     */
-    int incrementClaimantCount(ContentClaim claim, boolean newClaim);
-
-    /**
-     * Indicates that the given ContentClaim can now be destroyed by the
-     * appropriate Content Repository. This should be done only after it is
-     * guaranteed that the FlowFile Repository has been synchronized with its
-     * underlying storage component. This way, we avoid the following sequence
-     * of events:
-     * <ul>
-     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
-     * depends on ContentClaim C</li>
-     * <li>ContentClaim C is no longer needed and is destroyed</li>
-     * <li>The Operating System crashes or there is a power failure</li>
-     * <li>Upon restart, the FlowFile Repository was not synchronized with its
-     * underlying storage mechanism and as such indicates that FlowFile F needs
-     * ContentClaim C.</li>
-     * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
-     * and FlowFile F's Content is not found, so the FlowFile is removed,
-     * resulting in data loss.</li>
-     * </ul>
-     *
-     * <p>
-     * Using this method of marking the ContentClaim as destructable only when
-     * the FlowFile repository has been synced with the underlying storage
-     * mechanism, we can ensure that on restart, we will not point to this
-     * unneeded claim. As such, it is now safe to destroy the contents.
-     * </p>
-     *
-     * @param claim to mark as now destructable
-     */
-    void markDestructable(ContentClaim claim);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed. If no ContentClaim is ready to be destroyed at
-     * this time, will wait up to the specified amount of time before returning.
-     * If, after the specified amount of time, there is still no ContentClaim
-     * ready to be destroyed, the method will return without having added
-     * anything to the given {@code destination}.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     * @param timeout maximum time to wait
-     * @param unit unit of time to wait
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit);
-
-    /**
-     * Clears the manager's memory of any and all ContentClaims that it knows
-     * about
-     */
-    void purge();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
new file mode 100644
index 0000000..d448632
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+/**
+ * <p>
+ * Represents a resource that can be provided by a {@link ContentRepository}
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE!
+ * </p>
+ */
+public interface ResourceClaim extends Comparable<ResourceClaim> {
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    String getId();
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    String getContainer();
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    String getSection();
+
+    /**
+     * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
+     *         attempt to keep the content but will not sacrifice a great deal of
+     *         performance to do so
+     */
+    boolean isLossTolerant();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
new file mode 100644
index 0000000..01f4c65
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ResourceClaims that are used in the application
+ */
+public interface ResourceClaimManager {
+
+    /**
+     * Creates a new Resource Claim with the given id, container, section, and
+     * loss tolerance.
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @param lossTolerant of claim
+     * @return new claim
+     */
+    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+
+    /**
+     * @param claim to obtain reference count for
+     * @return the number of FlowFiles that hold a claim to a particular piece
+     * of FlowFile content
+     */
+    int getClaimantCount(ResourceClaim claim);
+
+    /**
+     * Decreases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to decrement claimants on
+     * @return new claimaint count
+     */
+    int decrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to increment claims on
+     * @return new claimant count
+     */
+    int incrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count.
+     *
+     * If it is known that the Content Claim whose count is being incremented is
+     * a newly created ResourceClaim, this method should be called with a value
+     * of {@code true} as the second argument, as it may allow the manager to
+     * optimize its tasks, knowing that the Content Claim cannot be referenced
+     * by any other component
+     *
+     * @param claim to increment
+     * @param newClaim provides a hint that no other process can have access to this
+     *            claim right now
+     * @return new claim count
+     */
+    int incrementClaimantCount(ResourceClaim claim, boolean newClaim);
+
+    /**
+     * Indicates that the given ResourceClaim can now be destroyed by the
+     * appropriate Content Repository. This should be done only after it is
+     * guaranteed that the FlowFile Repository has been synchronized with its
+     * underlying storage component. This way, we avoid the following sequence
+     * of events:
+     * <ul>
+     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer depends on ResourceClaim C</li>
+     * <li>ResourceClaim C is no longer needed and is destroyed</li>
+     * <li>The Operating System crashes or there is a power failure</li>
+     * <li>Upon restart, the FlowFile Repository was not synchronized with its underlying storage mechanism and as such indicates that FlowFile F needs ResourceClaim C.</li>
+     * <li>Since ResourceClaim C has already been destroyed, it is inaccessible, and FlowFile F's Content is not found, so the FlowFile is removed, resulting in data loss.</li>
+     * </ul>
+     *
+     * <p>
+     * Using this method of marking the ResourceClaim as destructable only when the FlowFile repository has been synced with the underlying storage mechanism, we can ensure that on restart, we will
+     * not point to this unneeded claim. As such, it is now safe to destroy the contents.
+     * </p>
+     *
+     * @param claim to mark as now destructable
+     */
+    void markDestructable(ResourceClaim claim);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed. If no ResourceClaim is ready to be destroyed at
+     * this time, will wait up to the specified amount of time before returning.
+     * If, after the specified amount of time, there is still no ResourceClaim
+     * ready to be destroyed, the method will return without having added
+     * anything to the given {@code destination}.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     * @param timeout maximum time to wait
+     * @param unit unit of time to wait
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);
+
+    /**
+     * Clears the manager's memory of any and all ResourceClaims that it knows
+     * about
+     */
+    void purge();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 9bbd45e..4865e98 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream {
     public void close() throws IOException {
         out.close();
     }
+    
+    public OutputStream getWrappedStream() {
+    	return out;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 604dba9..c829566 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,7 +61,9 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.QueueProvider;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -83,7 +85,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 6;
+    public static final int SWAP_ENCODING_VERSION = 7;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
 
     private final ScheduledExecutorService swapQueueIdentifierExecutor;
@@ -98,7 +100,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private final long swapOutMillis;
     private final int swapOutThreadCount;
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
 
@@ -138,7 +140,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     @Override
-    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
+    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) {
         this.claimManager = claimManager;
         this.flowFileRepository = flowFileRepository;
         this.eventReporter = eventReporter;
@@ -184,11 +186,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     out.writeBoolean(false);
                 } else {
                     out.writeBoolean(true);
-                    out.writeUTF(claim.getId());
-                    out.writeUTF(claim.getContainer());
-                    out.writeUTF(claim.getSection());
+                    final ResourceClaim resourceClaim = claim.getResourceClaim();
+                    out.writeUTF(resourceClaim.getId());
+                    out.writeUTF(resourceClaim.getContainer());
+                    out.writeUTF(resourceClaim.getSection());
+                    out.writeLong(claim.getOffset());
+                    out.writeLong(claim.getLength());
                     out.writeLong(flowFile.getContentClaimOffset());
-                    out.writeBoolean(claim.isLossTolerant());
+                    out.writeBoolean(resourceClaim.isLossTolerant());
                 }
 
                 final Map<String, String> attributes = flowFile.getAttributes();
@@ -226,7 +231,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
@@ -245,7 +250,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue,
-            final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
+            final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
             // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
@@ -292,6 +297,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
                 final String container = in.readUTF();
                 final String section = in.readUTF();
+
+                final long resourceOffset;
+                final long resourceLength;
+                if (serializationVersion < 6) {
+                    resourceOffset = 0L;
+                    resourceLength = -1L;
+                } else {
+                    resourceOffset = in.readLong();
+                    resourceLength = in.readLong();
+                }
+
                 final long claimOffset = in.readLong();
 
                 final boolean lossTolerant;
@@ -301,10 +317,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     lossTolerant = false;
                 }
 
-                final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
+                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+                claim.setLength(resourceLength);
 
                 if (incrementContentClaims) {
-                    claimManager.incrementClaimantCount(claim);
+                    claimManager.incrementClaimantCount(resourceClaim);
                 }
 
                 ffBuilder.contentClaim(claim);
@@ -353,16 +371,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             throw new EOFException();
         }
         if (firstValue == 0xff && secondValue == 0xff) {
-            int ch1 = in.read();
-            int ch2 = in.read();
-            int ch3 = in.read();
-            int ch4 = in.read();
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
             if ((ch1 | ch2 | ch3 | ch4) < 0) {
                 throw new EOFException();
             }
-            return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
         } else {
-            return ((firstValue << 8) + (secondValue));
+            return (firstValue << 8) + secondValue;
         }
     }
 
@@ -422,7 +440,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                 final FlowFileQueue flowFileQueue = entry.getKey();
 
                 // if queue is more than 60% of its swap threshold, don't swap flowfiles in
-                if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
+                if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) {
                     continue;
                 }
 
@@ -432,7 +450,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         final Queue<File> queue = queueLockWrapper.getQueue();
 
                         // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
+                        while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) {
                             File swapFile = null;
                             try {
                                 swapFile = queue.poll();
@@ -545,7 +563,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
                         if (swapQueue == null) {
                             swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                            QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
+                            final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
                             if (oldQueue != null) {
                                 swapQueue = oldQueue;
                             }
@@ -567,7 +585,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
      * @return the largest FlowFile ID that was recovered
      */
     @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
+    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
@@ -680,6 +698,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
+    @Override
     public void shutdown() {
         swapQueueIdentifierExecutor.shutdownNow();
         swapInExecutor.shutdownNow();

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d78b3a..815f97e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -98,9 +98,12 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
@@ -282,7 +285,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
-    private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
+    private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
 
     // guarded by rwLock
     /**
@@ -495,7 +498,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
     }
 
-    private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+    private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
@@ -3108,7 +3111,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isInputAvailable() {
                 try {
-                    return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
+                    return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+                        event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
@@ -3117,43 +3121,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isOutputAvailable() {
                 try {
-                    return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
+                    return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(),
+                        event.getContentClaimIdentifier(), event.getContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
             }
 
-            private ContentClaim createClaim(final String container, final String section, final String identifier) {
+            private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) {
                 if (container == null || section == null || identifier == null) {
                     return null;
                 }
 
-                return new ContentClaim() {
-                    @Override
-                    public int compareTo(final ContentClaim o) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return identifier;
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return container;
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return section;
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return false;
-                    }
-                };
+                final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false);
+                return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
             }
 
             @Override
@@ -3170,45 +3151,48 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         requireNonNull(requestUri);
 
         final ContentClaim claim;
-        final Long offset;
         final long size;
+        final long offset;
         if (direction == ContentDirection.INPUT) {
             if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Input Content Claim not specified");
             }
 
-            claim = contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false);
-            offset = provEvent.getPreviousContentClaimOffset();
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+                provEvent.getPreviousContentClaimIdentifier(), false);
+            claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
+            offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
         } else {
             if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Output Content Claim not specified");
             }
 
-            claim = contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false);
-            offset = provEvent.getContentClaimOffset();
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+                provEvent.getContentClaimIdentifier(), false);
+
+            claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
+            offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
             size = provEvent.getFileSize();
         }
 
         final InputStream rawStream = contentRepository.read(claim);
-        if (offset != null) {
-            StreamUtils.skip(rawStream, offset.longValue());
-        }
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.SEND)
-                .setFlowFileUUID(provEvent.getFlowFileUuid())
-                .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap())
-                .setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), offset, size)
-                .setTransitUri(requestUri)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
-                .setLineageStartDate(provEvent.getLineageStartDate())
-                .setComponentType(getName())
-                .setComponentId(getRootGroupId())
-                .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
-                .build();
+            .setEventType(ProvenanceEventType.SEND)
+            .setFlowFileUUID(provEvent.getFlowFileUuid())
+            .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
+            .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
+            .setTransitUri(requestUri)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
+            .setLineageStartDate(provEvent.getLineageStartDate())
+            .setComponentType(getName())
+            .setComponentId(getRootGroupId())
+            .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
+            .build();
 
         provenanceEventRepository.registerEvent(sendEvent);
 
@@ -3233,7 +3217,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         try {
-            if (!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer, contentClaimSection, contentClaimId, false))) {
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+            final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
+
+            if (!contentRepository.isAccessible(contentClaim)) {
                 return "Content is no longer available in Content Repository";
             }
         } catch (final IOException ioe) {
@@ -3310,18 +3297,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Create the ContentClaim
-        final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+        final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
                 event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
-        contentClaimManager.incrementClaimantCount(claim);
+        contentClaimManager.incrementClaimantCount(resourceClaim);
+        final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
+        final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
+        contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
 
-        if (!contentRepository.isAccessible(claim)) {
-            contentClaimManager.decrementClaimantCount(claim);
+        if (!contentRepository.isAccessible(contentClaim)) {
+            contentClaimManager.decrementClaimantCount(resourceClaim);
             throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
         }
 
-        final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
         final String parentUUID = event.getFlowFileUuid();
 
         // Create the FlowFile Record
@@ -3331,39 +3320,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final String newFlowFileUUID = UUID.randomUUID().toString();
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                // Copy relevant info from source FlowFile
-                .addAttributes(event.getPreviousAttributes())
-                .contentClaim(claim)
-                .contentClaimOffset(claimOffset)
-                .entryDate(System.currentTimeMillis())
-                .id(flowFileRepository.getNextFlowFileSequence())
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(event.getLineageStartDate())
-                .size(contentSize.longValue())
-                // Create a new UUID and add attributes indicating that this is a replay
-                .addAttribute("flowfile.replay", "true")
-                .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
-                .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
-                // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
-                .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
-                // build the record
-                .build();
+            // Copy relevant info from source FlowFile
+            .addAttributes(event.getPreviousAttributes())
+            .contentClaim(contentClaim)
+            .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
+            .entryDate(System.currentTimeMillis())
+            .id(flowFileRepository.getNextFlowFileSequence())
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(event.getLineageStartDate())
+            .size(contentSize.longValue())
+            // Create a new UUID and add attributes indicating that this is a replay
+            .addAttribute("flowfile.replay", "true")
+            .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
+            .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
+            // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
+            .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
+            // build the record
+            .build();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.REPLAY)
-                .addChildUuid(newFlowFileUUID)
-                .addParentUuid(parentUUID)
-                .setFlowFileUUID(parentUUID)
-                .setAttributes(Collections.<String, String>emptyMap(), flowFileRecord.getAttributes())
-                .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
-                .setDetails("Replay requested by " + requestor)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(System.currentTimeMillis())
-                .setLineageStartDate(event.getLineageStartDate())
-                .setComponentType(event.getComponentType())
-                .setComponentId(event.getComponentId())
-                .build();
+            .setEventType(ProvenanceEventType.REPLAY)
+            .addChildUuid(newFlowFileUUID)
+            .addParentUuid(parentUUID)
+            .setFlowFileUUID(parentUUID)
+            .setAttributes(Collections.<String, String> emptyMap(), flowFileRecord.getAttributes())
+            .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
+            .setDetails("Replay requested by " + requestor)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setLineageStartDate(event.getLineageStartDate())
+            .setComponentType(event.getComponentType())
+            .setComponentId(event.getComponentId())
+            .build();
         provenanceEventRepository.registerEvent(replayEvent);
 
         // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow


[2/3] nifi git commit: NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..3a6338c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -23,14 +23,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
@@ -61,18 +58,20 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,8 +91,11 @@ public class FileSystemRepository implements ContentRepository {
     private final AtomicLong index;
 
     private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
-    private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new HashMap<>();
+    private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+    private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
+    private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -101,7 +103,7 @@ public class FileSystemRepository implements ContentRepository {
     private final boolean alwaysSync;
     private final ScheduledExecutorService containerCleanupExecutor;
 
-    private ContentClaimManager contentClaimManager; // effectively final
+    private ResourceClaimManager resourceClaimManager; // effectively final
 
     // Map of contianer to archived files that should be deleted next.
     private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
@@ -113,7 +115,7 @@ public class FileSystemRepository implements ContentRepository {
         final NiFiProperties properties = NiFiProperties.getInstance();
         // determine the file repository paths and ensure they exist
         final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
-        for (Path path : fileRespositoryPaths.values()) {
+        for (final Path path : fileRespositoryPaths.values()) {
             Files.createDirectories(path);
         }
 
@@ -122,7 +124,7 @@ public class FileSystemRepository implements ContentRepository {
         index = new AtomicLong(0L);
 
         for (final String containerName : containerNames) {
-            reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000));
+            reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000));
             archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000));
         }
 
@@ -196,8 +198,8 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
-        this.contentClaimManager = claimManager;
+    public void initialize(final ResourceClaimManager claimManager) {
+        this.resourceClaimManager = claimManager;
 
         final NiFiProperties properties = NiFiProperties.getInstance();
 
@@ -231,6 +233,13 @@ public class FileSystemRepository implements ContentRepository {
     public void shutdown() {
         executor.shutdown();
         containerCleanupExecutor.shutdown();
+
+        for (final OutputStream out : writableClaimStreams.values()) {
+            try {
+                out.close();
+            } catch (final IOException ioe) {
+            }
+        }
     }
 
     private static double getRatio(final String value) {
@@ -397,8 +406,8 @@ public class FileSystemRepository implements ContentRepository {
         final String id = idPath.toFile().getName();
         final String sectionName = sectionPath.toFile().getName();
 
-        final ContentClaim contentClaim = contentClaimManager.newContentClaim(containerName, sectionName, id, false);
-        if (contentClaimManager.getClaimantCount(contentClaim) == 0) {
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+        if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
             removeIncompleteContent(fileToRemove);
         }
     }
@@ -427,15 +436,21 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     private Path getPath(final ContentClaim claim) {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        return getPath(resourceClaim);
+    }
+
+    private Path getPath(final ResourceClaim resourceClaim) {
+        final Path containerPath = containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             return null;
         }
-        return containerPath.resolve(claim.getSection()).resolve(claim.getId());
+        return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
     }
 
     private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        final Path containerPath = containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             if (verifyExists) {
                 throw new ContentNotFoundException(claim);
@@ -445,11 +460,11 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         // Create the Path that points to the data
-        Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
+        Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
 
         // If the data does not exist, create a Path that points to where the data would exist in the archive directory.
         if (!Files.exists(resolvedPath)) {
-            resolvedPath = getArchivePath(claim);
+            resolvedPath = getArchivePath(claim.getResourceClaim());
         }
 
         if (verifyExists && !Files.exists(resolvedPath)) {
@@ -460,34 +475,55 @@ public class FileSystemRepository implements ContentRepository {
 
     @Override
     public ContentClaim create(final boolean lossTolerant) throws IOException {
-        final long currentIndex = index.incrementAndGet();
-
-        String containerName = null;
-        boolean waitRequired = true;
-        ContainerState containerState = null;
-        for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
-            final long modulatedContainerIndex = containerIndex % containers.size();
-            containerName = containerNames.get((int) modulatedContainerIndex);
-
-            containerState = containerStateMap.get(containerName);
-            if (!containerState.isWaitRequired()) {
-                waitRequired = false;
-                break;
-            }
-        }
+        ResourceClaim resourceClaim;
+
+        // We need to synchronize on this queue because the act of pulling something off
+        // the queue and incrementing the associated claimant count MUST be done atomically.
+        // This way, if the claimant count is decremented to 0, we can ensure that the
+        // claim is not then pulled from the queue and used as another thread is destroying/archiving
+        // the claim.
+        final long resourceOffset;
+        synchronized (writableClaimQueue) {
+            final ClaimLengthPair pair = writableClaimQueue.poll();
+            if (pair == null) {
+                final long currentIndex = index.incrementAndGet();
+
+                String containerName = null;
+                boolean waitRequired = true;
+                ContainerState containerState = null;
+                for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
+                    final long modulatedContainerIndex = containerIndex % containers.size();
+                    containerName = containerNames.get((int) modulatedContainerIndex);
+
+                    containerState = containerStateMap.get(containerName);
+                    if (!containerState.isWaitRequired()) {
+                        waitRequired = false;
+                        break;
+                    }
+                }
 
-        if (waitRequired) {
-            containerState.waitForArchiveExpiration();
-        }
+                if (waitRequired) {
+                    containerState.waitForArchiveExpiration();
+                }
 
-        final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
-        final String section = String.valueOf(modulatedSectionIndex);
-        final String claimId = System.currentTimeMillis() + "-" + currentIndex;
+                final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
+                final String section = String.valueOf(modulatedSectionIndex);
+                final String claimId = System.currentTimeMillis() + "-" + currentIndex;
 
-        final ContentClaim claim = contentClaimManager.newContentClaim(containerName, section, claimId, lossTolerant);
-        contentClaimManager.incrementClaimantCount(claim, true);
+                resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+                resourceOffset = 0L;
+                LOG.debug("Creating new Resource Claim {}", resourceClaim);
+            } else {
+                resourceClaim = pair.getClaim();
+                resourceOffset = pair.getLength();
+                LOG.debug("Reusing Resource Claim {}", resourceClaim);
+            }
+
+            resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+        }
 
-        return claim;
+        final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
+        return scc;
     }
 
     @Override
@@ -496,7 +532,7 @@ public class FileSystemRepository implements ContentRepository {
             return 0;
         }
 
-        return contentClaimManager.incrementClaimantCount(claim);
+        return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -504,7 +540,7 @@ public class FileSystemRepository implements ContentRepository {
         if (claim == null) {
             return 0;
         }
-        return contentClaimManager.getClaimantCount(claim);
+        return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -513,7 +549,7 @@ public class FileSystemRepository implements ContentRepository {
             return 0;
         }
 
-        final int claimantCount = contentClaimManager.decrementClaimantCount(claim);
+        final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
         return claimantCount;
     }
 
@@ -523,9 +559,30 @@ public class FileSystemRepository implements ContentRepository {
             return false;
         }
 
+        return remove(claim.getResourceClaim());
+    }
+
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        // we synchronize on the queue here because if the claimant count is 0,
+        // we need to be able to remove any instance of that resource claim from the
+        // queue atomically (i.e., the checking of the claimant count plus removal from the queue
+        // must be atomic)
+        synchronized (writableClaimQueue) {
+            final int claimantCount = resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                // if other content claims are claiming the same resource, we have nothing to destroy,
+                // so just consider the destruction successful.
+                return true;
+            }
+        }
+
         Path path = null;
         try {
-            path = getPath(claim, false);
+            path = getPath(claim);
         } catch (final ContentNotFoundException cnfe) {
         }
 
@@ -538,6 +595,7 @@ public class FileSystemRepository implements ContentRepository {
         return true;
     }
 
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
         if (original == null) {
@@ -545,14 +603,11 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         final ContentClaim newClaim = create(lossTolerant);
-        final Path currPath = getPath(original, true);
-        final Path newPath = getPath(newClaim);
-        try (final FileOutputStream fos = new FileOutputStream(newPath.toFile())) {
-            Files.copy(currPath, fos);
-            if (alwaysSync) {
-                fos.getFD().sync();
-            }
+        try (final InputStream in = read(original);
+            final OutputStream out = write(newClaim)) {
+            StreamUtils.copy(in, out);
         } catch (final IOException ioe) {
+            decrementClaimantCount(newClaim);
             remove(newClaim);
             throw ioe;
         }
@@ -564,44 +619,28 @@ public class FileSystemRepository implements ContentRepository {
         if (claims.contains(destination)) {
             throw new IllegalArgumentException("destination cannot be within claims");
         }
-        try (final FileChannel dest = FileChannel.open(getPath(destination), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-            long position = 0L;
-            if (header != null && header.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(header);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
-                }
+
+        try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
+            if (header != null) {
+                out.write(header);
             }
-            int objectIndex = 0;
+
+            int i = 0;
             for (final ContentClaim claim : claims) {
-                long totalCopied = 0L;
-                try (final FileChannel src = FileChannel.open(getPath(claim, true), StandardOpenOption.READ)) {
-                    while (totalCopied < src.size()) {
-                        final long copiedThisIteration = dest.transferFrom(src, position, Long.MAX_VALUE);
-                        totalCopied += copiedThisIteration;
-                        position += copiedThisIteration;
-                    }
+                try (final InputStream in = read(claim)) {
+                    StreamUtils.copy(in, out);
                 }
-                // don't add demarcator after the last claim
-                if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) {
-                    final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
-                    while (buffer.hasRemaining()) {
-                        position += dest.write(buffer, position);
-                    }
-                }
-            }
-            if (footer != null && footer.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(footer);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
+
+                if (++i < claims.size() && demarcator != null) {
+                    out.write(demarcator);
                 }
             }
 
-            if (alwaysSync) {
-                dest.force(true);
+            if (footer != null) {
+                out.write(footer);
             }
 
-            return position;
+            return out.getBytesWritten();
         }
     }
 
@@ -624,12 +663,8 @@ public class FileSystemRepository implements ContentRepository {
 
     @Override
     public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException {
-        try (final FileOutputStream out = new FileOutputStream(getPath(claim).toFile(), append)) {
-            final long copied = StreamUtils.copy(content, out);
-            if (alwaysSync) {
-                out.getFD().sync();
-            }
-            return copied;
+        try (final OutputStream out = write(claim, append)) {
+            return StreamUtils.copy(content, out);
         }
     }
 
@@ -642,20 +677,14 @@ public class FileSystemRepository implements ContentRepository {
             Files.createFile(destination);
             return 0L;
         }
-        if (append) {
-            try (final FileChannel sourceChannel = FileChannel.open(getPath(claim, true), StandardOpenOption.READ);
-                    final FileChannel destinationChannel = FileChannel.open(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
-                long position = destinationChannel.size();
-                final long targetSize = position + sourceChannel.size();
-                while (position < targetSize) {
-                    final long bytesCopied = destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE);
-                    position += bytesCopied;
-                }
-                return position;
+
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+            final long copied = StreamUtils.copy(in, fos);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
-        } else {
-            Files.copy(getPath(claim, true), destination, StandardCopyOption.REPLACE_EXISTING);
-            return Files.size(destination);
+            return copied;
         }
     }
 
@@ -674,28 +703,20 @@ public class FileSystemRepository implements ContentRepository {
 
         final long claimSize = size(claim);
         if (offset > claimSize) {
-            throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize);
+            throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize);
 
         }
 
-        if (append) {
-            try (final InputStream sourceStream = Files.newInputStream(getPath(claim, true), StandardOpenOption.READ);
-                    final OutputStream destinationStream = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
-                StreamUtils.skip(sourceStream, offset);
-
-                final byte[] buffer = new byte[8192];
-                int len;
-                long copied = 0L;
-                while ((len = sourceStream.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) {
-                    destinationStream.write(buffer, 0, len);
-                    copied += len;
-                }
-                return copied;
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+            if (offset > 0) {
+                StreamUtils.skip(in, offset);
             }
-        } else {
-            try (final OutputStream out = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-                return exportTo(claim, out, offset, length);
+            StreamUtils.copy(in, fos, length);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
+            return length;
         }
     }
 
@@ -704,7 +725,10 @@ public class FileSystemRepository implements ContentRepository {
         if (claim == null) {
             return 0L;
         }
-        return Files.copy(getPath(claim, true), destination);
+
+        try (final InputStream in = read(claim)) {
+            return StreamUtils.copy(in, destination);
+        }
     }
 
     @Override
@@ -719,7 +743,7 @@ public class FileSystemRepository implements ContentRepository {
         if (offset == 0 && length == claimSize) {
             return exportTo(claim, destination);
         }
-        try (final InputStream in = Files.newInputStream(getPath(claim, true))) {
+        try (final InputStream in = read(claim)) {
             StreamUtils.skip(in, offset);
             final byte[] buffer = new byte[8192];
             int len;
@@ -738,7 +762,12 @@ public class FileSystemRepository implements ContentRepository {
             return 0L;
         }
 
-        return Files.size(getPath(claim, true));
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() < 0) {
+            return Files.size(getPath(claim, true));
+        }
+
+        return claim.getLength();
     }
 
     @Override
@@ -747,16 +776,198 @@ public class FileSystemRepository implements ContentRepository {
             return new ByteArrayInputStream(new byte[0]);
         }
         final Path path = getPath(claim, true);
-        return new FileInputStream(path.toFile());
+        final FileInputStream fis = new FileInputStream(path.toFile());
+        if (claim.getOffset() > 0L) {
+            StreamUtils.skip(fis, claim.getOffset());
+        }
+
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() >= 0) {
+            return new LimitedInputStream(fis, claim.getLength());
+        } else {
+            return fis;
+        }
     }
 
     @Override
-    @SuppressWarnings("resource")
     public OutputStream write(final ContentClaim claim) throws IOException {
-        final FileOutputStream fos = new FileOutputStream(getPath(claim).toFile());
-        return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos;
+        return write(claim, false);
     }
 
+    private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
+        if (claim == null) {
+            throw new NullPointerException("ContentClaim cannot be null");
+        }
+
+        if (!(claim instanceof StandardContentClaim)) {
+            // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything
+            // else, just throw an Exception because it is not valid for this Repository
+            throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository");
+        }
+
+        final StandardContentClaim scc = (StandardContentClaim) claim;
+
+        // we always append because there may be another ContentClaim using the same resource claim.
+        // However, we know that we will never write to the same claim from two different threads
+        // at the same time because we will call create() to get the claim before we write to it,
+        // and when we call create(), it will remove it from the Queue, which means that no other
+        // thread will get the same Claim until we've finished writing to it.
+        ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
+        final long initialLength;
+        if (claimStream == null) {
+            final File file = getPath(scc).toFile();
+            claimStream = new ByteCountingOutputStream(new FileOutputStream(file, true), file.length());
+            initialLength = 0L;
+        } else {
+            if (append) {
+                initialLength = Math.max(0, scc.getLength());
+            } else {
+                initialLength = 0;
+                scc.setOffset(claimStream.getBytesWritten());
+            }
+        }
+
+        final ByteCountingOutputStream bcos = claimStream;
+        final OutputStream out = new OutputStream() {
+            private long bytesWritten = 0L;
+            private boolean recycle = true;
+            private boolean closed = false;
+
+            @Override
+            public String toString() {
+                return "FileSystemRepository Stream [" + scc + "]";
+            }
+
+            @Override
+            public synchronized void write(final int b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten++;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += b.length;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b, off, len);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += len;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void flush() throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                bcos.flush();
+            }
+
+            @Override
+            public synchronized void close() throws IOException {
+                closed = true;
+
+                if (alwaysSync) {
+                    ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
+                }
+
+                if (scc.getLength() < 0) {
+                    // If claim was not written to, set length to 0
+                    scc.setLength(0L);
+                }
+
+                // if we've not yet hit the threshold for appending to a resource claim, add the claim
+                // to the writableClaimQueue so that the Resource Claim can be used again when create()
+                // is called. In this case, we don't have to actually close the file stream. Instead, we
+                // can just add it onto the queue and continue to use it for the next content claim.
+                final long resourceClaimLength = scc.getOffset() + scc.getLength();
+                if (recycle && resourceClaimLength < maxAppendClaimLength) {
+                    // we do not have to synchronize on the writable claim queue here because we
+                    // are only adding something to the queue. We must synchronize if we are
+                    // using a ResourceClaim from the queue and incrementing the claimant count on that resource
+                    // because those need to be done atomically, or if we are destroying a claim that is on
+                    // the queue because we need to ensure that the latter operation does not cause problems
+                    // with the former.
+                    final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
+                    final boolean enqueued;
+                    if (writableClaimQueue.contains(pair)) {
+                        // may already exist on the queue, if the content claim is written to multiple times.
+                        enqueued = true;
+                    } else {
+                        enqueued = writableClaimQueue.offer(pair);
+                    }
+
+                    if (enqueued) {
+                        writableClaimStreams.put(scc.getResourceClaim(), bcos);
+                        LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+                    } else {
+                        writableClaimStreams.remove(scc.getResourceClaim());
+                        bcos.close();
+
+                        LOG.debug("Claim length less than max; Closing {}", this);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+                        }
+                    }
+                } else {
+                    // we've reached the limit for this claim. Don't add it back to our queue.
+                    // Instead, just remove it and move on.
+                    writableClaimStreams.remove(scc.getResourceClaim());
+                    // ensure that the claim is no longer on the queue
+                    writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+                    bcos.close();
+                    LOG.debug("Claim lenth >= max; Closing {}", this);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+                    }
+                }
+            }
+        };
+
+        LOG.debug("Writing to {}", out);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out));
+        }
+
+        return out;
+    }
+
+
     @Override
     public void purge() {
         // delete all content from repositories
@@ -788,7 +999,7 @@ public class FileSystemRepository implements ContentRepository {
             }
         }
 
-        contentClaimManager.purge();
+        resourceClaimManager.purge();
     }
 
     private class BinDestructableClaims implements Runnable {
@@ -800,17 +1011,17 @@ public class FileSystemRepository implements ContentRepository {
                 // because the Container generally maps to a physical partition on the disk, so we want a few
                 // different threads hitting the different partitions but don't want multiple threads hitting
                 // the same partition.
-                final List<ContentClaim> toDestroy = new ArrayList<>();
+                final List<ResourceClaim> toDestroy = new ArrayList<>();
                 while (true) {
                     toDestroy.clear();
-                    contentClaimManager.drainDestructableClaims(toDestroy, 10000);
+                    resourceClaimManager.drainDestructableClaims(toDestroy, 10000);
                     if (toDestroy.isEmpty()) {
                         return;
                     }
 
-                    for (final ContentClaim claim : toDestroy) {
+                    for (final ResourceClaim claim : toDestroy) {
                         final String container = claim.getContainer();
-                        final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
+                        final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container);
 
                         try {
                             while (true) {
@@ -838,7 +1049,7 @@ public class FileSystemRepository implements ContentRepository {
         return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
     }
 
-    private Path getArchivePath(final ContentClaim claim) {
+    private Path getArchivePath(final ResourceClaim claim) {
         final String claimId = claim.getId();
         final Path containerPath = containers.get(claim.getContainer());
         final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
@@ -859,22 +1070,28 @@ public class FileSystemRepository implements ContentRepository {
             return true;
         }
 
-        return Files.exists(getArchivePath(contentClaim));
+        return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
     }
 
-    private void archive(final ContentClaim contentClaim) throws IOException {
+    private void archive(final ResourceClaim claim) throws IOException {
         if (!archiveData) {
             return;
         }
 
-        final int claimantCount = getClaimantCount(contentClaim);
-        if (claimantCount > 0) {
-            throw new IllegalStateException("Cannot archive ContentClaim " + contentClaim + " because it is currently in use");
+        synchronized (writableClaimQueue) {
+            final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                return;
+            }
+        }
+
+        final Path curPath = getPath(claim);
+        if (curPath == null) {
+            return;
         }
 
-        final Path curPath = getPath(contentClaim, true);
         archive(curPath);
-        LOG.debug("Successfully moved {} to archive", contentClaim);
+        LOG.debug("Successfully moved {} to archive", claim);
     }
 
     private void archive(final Path curPath) throws IOException {
@@ -1107,8 +1324,8 @@ public class FileSystemRepository implements ContentRepository {
                 while (true) {
                     // look through each of the binned queues of Content Claims
                     int successCount = 0;
-                    final List<ContentClaim> toRemove = new ArrayList<>();
-                    for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) {
+                    final List<ResourceClaim> toRemove = new ArrayList<>();
+                    for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) {
                         // drain the queue of all ContentClaims that can be destroyed for the given container.
                         final String container = entry.getKey();
                         final ContainerState containerState = containerStateMap.get(container);
@@ -1121,7 +1338,7 @@ public class FileSystemRepository implements ContentRepository {
 
                         // destroy each claim for this container
                         final long start = System.nanoTime();
-                        for (final ContentClaim claim : toRemove) {
+                        for (final ResourceClaim claim : toRemove) {
                             if (archiveData) {
                                 try {
                                     archive(claim);
@@ -1210,7 +1427,7 @@ public class FileSystemRepository implements ContentRepository {
         @Override
         public void run() {
             try {
-                if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
+                if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
                     final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
                     if (minRequiredSpace == null) {
                         return;
@@ -1245,7 +1462,7 @@ public class FileSystemRepository implements ContentRepository {
                 if (oldestContainerArchive < 0L) {
                     boolean updated;
                     do {
-                        long oldest = oldestArchiveDate.get();
+                        final long oldest = oldestArchiveDate.get();
                         if (oldestContainerArchive < oldest) {
                             updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
 
@@ -1298,7 +1515,7 @@ public class FileSystemRepository implements ContentRepository {
                     final long free = getContainerUsableSpace(containerName);
                     used = capacity - free;
                     bytesUsed = used;
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -1317,7 +1534,7 @@ public class FileSystemRepository implements ContentRepository {
                     try {
                         LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
                         condition.await();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                     }
                 }
             } finally {
@@ -1355,4 +1572,55 @@ public class FileSystemRepository implements ContentRepository {
         }
     }
 
+
+    private static class ClaimLengthPair {
+        private final ResourceClaim claim;
+        private final Long length;
+
+        public ClaimLengthPair(final ResourceClaim claim, final Long length) {
+            this.claim = claim;
+            this.length = length;
+        }
+
+        public ResourceClaim getClaim() {
+            return claim;
+        }
+
+        public Long getLength() {
+            return length;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (claim == null ? 0 : claim.hashCode());
+            return result;
+        }
+
+        /**
+         * Equality is determined purely by the ResourceClaim's equality
+         *
+         * @param obj the object to compare against
+         * @return -1, 0, or +1 according to the contract of Object.equals
+         */
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null) {
+                return false;
+            }
+
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            final ClaimLengthPair other = (ClaimLengthPair) obj;
+            return claim.equals(other.getClaim());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 6524cd3..cc8c734 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
 
     @Override
     public boolean isPenalized() {
-        return (penaltyExpirationMs > 0) ? penaltyExpirationMs > System.currentTimeMillis() : false;
+        return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
     }
 
     @Override
@@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
     public String toString() {
         final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
         builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
-        builder.append("claim", claim == null ? "" : claim.getId());
+        builder.append("claim", claim == null ? "" : claim.toString());
         builder.append("offset", claimOffset);
         builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
         return builder.toString();
@@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         private final Set<String> bLineageIdentifiers = new HashSet<>();
         private long bPenaltyExpirationMs = -1L;
         private long bSize = 0L;
-        private Map<String, String> bAttributes = new HashMap<>();
+        private final Map<String, String> bAttributes = new HashMap<>();
         private ContentClaim bClaim = null;
         private long bClaimOffset = 0L;
         private long bLastQueueDate = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 04e819e..2ce48fa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
 import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
@@ -54,10 +55,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.repository.io.LongHolder;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.QueueSize;
@@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,10 +90,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
-    private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
-        NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
-    private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
-
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
@@ -126,11 +120,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private long contentSizeIn = 0L, contentSizeOut = 0L;
     private int writeRecursionLevel = 0;
 
-    private ContentClaim currentWriteClaim = null;
-    private OutputStream currentWriteClaimStream = null;
-    private long currentWriteClaimSize = 0L;
-    private int currentWriteClaimFlowFileCount = 0;
-
     private ContentClaim currentReadClaim = null;
     private ByteCountingInputStream currentReadClaimStream = null;
     private long processingStartTime;
@@ -690,12 +679,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         if (originalClaim == null) {
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
+            final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
             builder.setCurrentContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
-                );
+                resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
             builder.setPreviousContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
-                );
+                resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
         }
     }
 
@@ -711,14 +701,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final ContentClaim currentClaim = repoRecord.getCurrentClaim();
             final long currentOffset = repoRecord.getCurrentClaimOffset();
             final long size = flowFile.getSize();
-            recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+            final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+            recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
         }
 
         if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
             final ContentClaim originalClaim = repoRecord.getOriginalClaim();
             final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
             final long originalSize = repoRecord.getOriginal().getSize();
-            recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+            final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+            recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
         }
 
         final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -741,14 +735,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 final ContentClaim currentClaim = repoRecord.getCurrentClaim();
                 final long currentOffset = repoRecord.getCurrentClaimOffset();
                 final long size = eventFlowFile.getSize();
-                recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+                final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+                recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
             }
 
             if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
                 final ContentClaim originalClaim = repoRecord.getOriginalClaim();
                 final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
                 final long originalSize = repoRecord.getOriginal().getSize();
-                recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+                final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+                recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
             }
 
             final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -1650,8 +1648,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                             final ContentClaim claim = record.getContentClaim();
                             if (claim != null) {
-                                enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
-                                enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+                                final ResourceClaim resourceClaim = claim.getResourceClaim();
+                                enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + claim.getOffset(), record.getSize());
+                                enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + claim.getOffset(), record.getSize());
                             }
 
                             enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
@@ -1695,7 +1696,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                             StreamUtils.skip(currentReadClaimStream, bytesToSkip);
                         }
 
-                        return new NonCloseableInputStream(currentReadClaimStream);
+                        return new DisableOnCloseInputStream(currentReadClaimStream);
                     }
                 }
 
@@ -1711,7 +1712,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                 // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
-                return new NonCloseableInputStream(currentReadClaimStream);
+                return new DisableOnCloseInputStream(currentReadClaimStream);
             } else {
                 final InputStream rawInStream = context.getContentRepository().read(claim);
                 StreamUtils.skip(rawInStream, offset);
@@ -1862,30 +1863,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return newFile;
     }
 
-    private void enforceCurrentWriteClaimState() {
-        if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) {
-            resetWriteClaims();
-        }
-
-        if (currentWriteClaimStream == null) {
-            try {
-                currentWriteClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} to enforce Current Write Claim State for {}", currentWriteClaim, context.getConnectable());
-            } catch (final IOException e) {
-                throw new FlowFileHandlingException("Unable to create ContentClaim due to " + e.toString(), e);
-            }
-
-            try {
-                currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
-            } catch (final IOException e) {
-                resetWriteClaims();
-                throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
-            }
-        } else {
-            context.getContentRepository().incrementClaimaintCount(currentWriteClaim);
-        }
-    }
-
     private void ensureNotAppending(final ContentClaim claim) throws IOException {
         if (claim == null) {
             return;
@@ -1905,56 +1882,31 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
         long newSize = 0L;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
 
         ContentClaim newClaim = null;
         final LongHolder writtenHolder = new LongHolder(0L);
         final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-
-                    recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
-                } finally {
-                    recursionSet.remove(source);
-                }
-
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += newSize;
-            } else {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
 
-                ensureNotAppending(newClaim);
-                try (final OutputStream stream = context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
-                    recursionSet.add(source);
+            ensureNotAppending(newClaim);
+            try (final OutputStream stream = context.getContentRepository().write(newClaim);
+                final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+                final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                recursionSet.add(source);
 
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
+                writeRecursionLevel++;
+                try {
+                    writer.process(new FlowFileAccessOutputStream(countingOut, source));
                 } finally {
-                    recursionSet.remove(source);
+                    writeRecursionLevel--;
                 }
-                newSize = context.getContentRepository().size(newClaim);
+            } finally {
+                recursionSet.remove(source);
             }
+            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
             if (appendToClaim) {
                 resetWriteClaims(); // need to reset write claim before we can remove the claim
@@ -2115,18 +2067,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void resetWriteClaims() {
-        try {
-            if (currentWriteClaimStream != null) {
-                currentWriteClaimStream.flush();
-                currentWriteClaimStream.close();
-            }
-        } catch (final Exception e) {
-        }
-        currentWriteClaimStream = null;
-        currentWriteClaim = null;
-        currentWriteClaimFlowFileCount = 0;
-        currentWriteClaimSize = 0L;
-
         for (final ByteCountingOutputStream out : appendableStreams.values()) {
             try {
                 out.flush();
@@ -2166,116 +2106,60 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         ContentClaim newClaim = null;
         long newSize = 0L;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
         final LongHolder writtenHolder = new LongHolder(0L);
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-                    final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
-                    final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                    final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
-
-                    recursionSet.add(source);
-
-                    // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
-
-                        // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
-                            throw ffais.getContentNotFoundException();
-                        }
-                    }
-                }
-
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += writtenThisCall;
-            } else {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
 
-                ensureNotAppending(newClaim);
+            ensureNotAppending(newClaim);
 
-                try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
-                    final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
-                    final OutputStream os = context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
+            try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
+                final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
+                final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+                final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                final OutputStream os = context.getContentRepository().write(newClaim);
+                final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+                final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
-                    recursionSet.add(source);
+                recursionSet.add(source);
 
-                    // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
+                // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
+                // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
+                // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
+                // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
+                // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
+                final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
+                boolean cnfeThrown = false;
 
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
+                writeRecursionLevel++;
+                try {
+                    writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
+                } catch (final ContentNotFoundException cnfe) {
+                    cnfeThrown = true;
+                    throw cnfe;
+                } finally {
+                    writeRecursionLevel--;
+                    recursionSet.remove(source);
 
-                        // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
-                            throw ffais.getContentNotFoundException();
-                        }
+                    // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
+                    if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
+                        throw ffais.getContentNotFoundException();
                     }
                 }
-
-                newSize = context.getContentRepository().size(newClaim);
             }
+
+            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final FlowFileAccessException ffae) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw ffae;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             throw t;
         } finally {
@@ -2302,33 +2186,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final ContentClaim newClaim;
         final long claimOffset;
 
-        final boolean appendToClaim = isMergeContent();
-        if (appendToClaim) {
-            enforceCurrentWriteClaimState();
-            newClaim = currentWriteClaim;
-            claimOffset = currentWriteClaimSize;
-        } else {
-            try {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
-            }
-
-            claimOffset = 0L;
+        try {
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+        } catch (final IOException e) {
+            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
         }
 
+        claimOffset = 0L;
         long newSize = 0L;
         try {
             final boolean append = isMergeContent();
             newSize = context.getContentRepository().importFrom(source, newClaim, append);
             bytesWritten.increment(newSize);
             bytesRead.increment(newSize);
-            currentWriteClaimSize += newSize;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
         }
@@ -2351,31 +2223,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
         ContentClaim newClaim = null;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
 
         final long newSize;
         final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                newClaim = currentWriteClaim;
-                claimOffset = currentWriteClaimSize;
-
-                final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
-                bytesWritten.increment(bytesCopied);
-                currentWriteClaimSize += bytesCopied;
-                newSize = bytesCopied;
-            } else {
-                try {
-                    newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                    claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+            try {
+                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
 
-                    newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
-                    bytesWritten.increment(newSize);
-                    currentWriteClaimSize += newSize;
-                } catch (final IOException e) {
-                    throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
-                }
+                newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
+                bytesWritten.increment(newSize);
+            } catch (final IOException e) {
+                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
             }
         } catch (final Throwable t) {
             if (appendToClaim) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..6c1626c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
 import org.apache.nifi.controller.repository.io.MemoryManager;
@@ -92,7 +95,7 @@ public class VolatileContentRepository implements ContentRepository {
     private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
     private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     public VolatileContentRepository() {
         this(NiFiProperties.getInstance());
@@ -119,7 +122,7 @@ public class VolatileContentRepository implements ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
 
         for (int i = 0; i < 3; i++) {
@@ -199,9 +202,10 @@ public class VolatileContentRepository implements ContentRepository {
 
     private ContentClaim createLossTolerant() {
         final long id = idGenerator.getAndIncrement();
-        final ContentClaim claim = claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+        final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
         final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
-        claimManager.incrementClaimantCount(claim, true);
+        claimManager.incrementClaimantCount(resourceClaim, true);
 
         claimMap.put(claim, contentBlock);
 
@@ -216,7 +220,7 @@ public class VolatileContentRepository implements ContentRepository {
         }
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.incrementClaimantCount(resolveClaim(claim));
+            return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().incrementClaimaintCount(backupClaim);
         }
@@ -230,7 +234,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.decrementClaimantCount(resolveClaim(claim));
+            return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().decrementClaimantCount(backupClaim);
         }
@@ -244,7 +248,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.getClaimantCount(resolveClaim(claim));
+            return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().getClaimantCount(backupClaim);
         }
@@ -273,6 +277,29 @@ public class VolatileContentRepository implements ContentRepository {
         return true;
     }
 
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        final Set<ContentClaim> contentClaims = new HashSet<>();
+        for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) {
+            final ContentClaim contentClaim = entry.getKey();
+            if (contentClaim.getResourceClaim().equals(claim)) {
+                contentClaims.add(contentClaim);
+            }
+        }
+
+        boolean removed = false;
+        for (final ContentClaim contentClaim : contentClaims) {
+            if (remove(contentClaim)) {
+                removed = true;
+            }
+        }
+
+        return removed;
+    }
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
         final ContentClaim createdClaim = create(lossTolerant);
@@ -435,7 +462,7 @@ public class VolatileContentRepository implements ContentRepository {
     @Override
     public void purge() {
         for (final ContentClaim claim : claimMap.keySet()) {
-            claimManager.decrementClaimantCount(resolveClaim(claim));
+            claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
             final ContentClaim backup = getBackupClaim(claim);
             if (backup != null) {
                 getBackupRepository().remove(backup);
@@ -624,7 +651,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         @Override
         public void run() {
-            final List<ContentClaim> destructable = new ArrayList<>(1000);
+            final List<ResourceClaim> destructable = new ArrayList<>(1000);
             while (true) {
                 destructable.clear();
                 claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS);
@@ -632,7 +659,7 @@ public class VolatileContentRepository implements ContentRepository {
                     return;
                 }
 
-                for (final ContentClaim claim : destructable) {
+                for (final ResourceClaim claim : destructable) {
                     remove(claim);
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index fe34fe0..a85b23b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * <p>
@@ -32,10 +34,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 public class VolatileFlowFileRepository implements FlowFileRepository {
 
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
     }
 
@@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
     public void close() throws IOException {
     }
 
+    private void markDestructable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        if (resourceClaim == null) {
+            return;
+        }
+
+        claimManager.markDestructable(resourceClaim);
+    }
+
+    private int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        if (resourceClaim == null) {
+            return 0;
+        }
+
+        return claimManager.getClaimantCount(resourceClaim);
+    }
+
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
-                    claimManager.markDestructable(record.getCurrentClaim());
+                if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
+                    markDestructable(record.getCurrentClaim());
                 }
 
                 // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, mark original as destructable
-                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             }
         }