You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 23:02:19 UTC

[2/4] nifi git commit: NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..3a6338c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -23,14 +23,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
@@ -61,18 +58,20 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,8 +91,11 @@ public class FileSystemRepository implements ContentRepository {
     private final AtomicLong index;
 
     private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
-    private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new HashMap<>();
+    private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+    private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
+    private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -101,7 +103,7 @@ public class FileSystemRepository implements ContentRepository {
     private final boolean alwaysSync;
     private final ScheduledExecutorService containerCleanupExecutor;
 
-    private ContentClaimManager contentClaimManager; // effectively final
+    private ResourceClaimManager resourceClaimManager; // effectively final
 
     // Map of contianer to archived files that should be deleted next.
     private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
@@ -113,7 +115,7 @@ public class FileSystemRepository implements ContentRepository {
         final NiFiProperties properties = NiFiProperties.getInstance();
         // determine the file repository paths and ensure they exist
         final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
-        for (Path path : fileRespositoryPaths.values()) {
+        for (final Path path : fileRespositoryPaths.values()) {
             Files.createDirectories(path);
         }
 
@@ -122,7 +124,7 @@ public class FileSystemRepository implements ContentRepository {
         index = new AtomicLong(0L);
 
         for (final String containerName : containerNames) {
-            reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000));
+            reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000));
             archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000));
         }
 
@@ -196,8 +198,8 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
-        this.contentClaimManager = claimManager;
+    public void initialize(final ResourceClaimManager claimManager) {
+        this.resourceClaimManager = claimManager;
 
         final NiFiProperties properties = NiFiProperties.getInstance();
 
@@ -231,6 +233,13 @@ public class FileSystemRepository implements ContentRepository {
     public void shutdown() {
         executor.shutdown();
         containerCleanupExecutor.shutdown();
+
+        for (final OutputStream out : writableClaimStreams.values()) {
+            try {
+                out.close();
+            } catch (final IOException ioe) {
+            }
+        }
     }
 
     private static double getRatio(final String value) {
@@ -397,8 +406,8 @@ public class FileSystemRepository implements ContentRepository {
         final String id = idPath.toFile().getName();
         final String sectionName = sectionPath.toFile().getName();
 
-        final ContentClaim contentClaim = contentClaimManager.newContentClaim(containerName, sectionName, id, false);
-        if (contentClaimManager.getClaimantCount(contentClaim) == 0) {
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+        if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
             removeIncompleteContent(fileToRemove);
         }
     }
@@ -427,15 +436,21 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     private Path getPath(final ContentClaim claim) {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        return getPath(resourceClaim);
+    }
+
+    private Path getPath(final ResourceClaim resourceClaim) {
+        final Path containerPath = containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             return null;
         }
-        return containerPath.resolve(claim.getSection()).resolve(claim.getId());
+        return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
     }
 
     private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        final Path containerPath = containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             if (verifyExists) {
                 throw new ContentNotFoundException(claim);
@@ -445,11 +460,11 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         // Create the Path that points to the data
-        Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
+        Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
 
         // If the data does not exist, create a Path that points to where the data would exist in the archive directory.
         if (!Files.exists(resolvedPath)) {
-            resolvedPath = getArchivePath(claim);
+            resolvedPath = getArchivePath(claim.getResourceClaim());
         }
 
         if (verifyExists && !Files.exists(resolvedPath)) {
@@ -460,34 +475,55 @@ public class FileSystemRepository implements ContentRepository {
 
     @Override
     public ContentClaim create(final boolean lossTolerant) throws IOException {
-        final long currentIndex = index.incrementAndGet();
-
-        String containerName = null;
-        boolean waitRequired = true;
-        ContainerState containerState = null;
-        for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
-            final long modulatedContainerIndex = containerIndex % containers.size();
-            containerName = containerNames.get((int) modulatedContainerIndex);
-
-            containerState = containerStateMap.get(containerName);
-            if (!containerState.isWaitRequired()) {
-                waitRequired = false;
-                break;
-            }
-        }
+        ResourceClaim resourceClaim;
+
+        // We need to synchronize on this queue because the act of pulling something off
+        // the queue and incrementing the associated claimant count MUST be done atomically.
+        // This way, if the claimant count is decremented to 0, we can ensure that the
+        // claim is not then pulled from the queue and used as another thread is destroying/archiving
+        // the claim.
+        final long resourceOffset;
+        synchronized (writableClaimQueue) {
+            final ClaimLengthPair pair = writableClaimQueue.poll();
+            if (pair == null) {
+                final long currentIndex = index.incrementAndGet();
+
+                String containerName = null;
+                boolean waitRequired = true;
+                ContainerState containerState = null;
+                for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
+                    final long modulatedContainerIndex = containerIndex % containers.size();
+                    containerName = containerNames.get((int) modulatedContainerIndex);
+
+                    containerState = containerStateMap.get(containerName);
+                    if (!containerState.isWaitRequired()) {
+                        waitRequired = false;
+                        break;
+                    }
+                }
 
-        if (waitRequired) {
-            containerState.waitForArchiveExpiration();
-        }
+                if (waitRequired) {
+                    containerState.waitForArchiveExpiration();
+                }
 
-        final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
-        final String section = String.valueOf(modulatedSectionIndex);
-        final String claimId = System.currentTimeMillis() + "-" + currentIndex;
+                final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
+                final String section = String.valueOf(modulatedSectionIndex);
+                final String claimId = System.currentTimeMillis() + "-" + currentIndex;
 
-        final ContentClaim claim = contentClaimManager.newContentClaim(containerName, section, claimId, lossTolerant);
-        contentClaimManager.incrementClaimantCount(claim, true);
+                resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+                resourceOffset = 0L;
+                LOG.debug("Creating new Resource Claim {}", resourceClaim);
+            } else {
+                resourceClaim = pair.getClaim();
+                resourceOffset = pair.getLength();
+                LOG.debug("Reusing Resource Claim {}", resourceClaim);
+            }
+
+            resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+        }
 
-        return claim;
+        final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
+        return scc;
     }
 
     @Override
@@ -496,7 +532,7 @@ public class FileSystemRepository implements ContentRepository {
             return 0;
         }
 
-        return contentClaimManager.incrementClaimantCount(claim);
+        return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -504,7 +540,7 @@ public class FileSystemRepository implements ContentRepository {
         if (claim == null) {
             return 0;
         }
-        return contentClaimManager.getClaimantCount(claim);
+        return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -513,7 +549,7 @@ public class FileSystemRepository implements ContentRepository {
             return 0;
         }
 
-        final int claimantCount = contentClaimManager.decrementClaimantCount(claim);
+        final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
         return claimantCount;
     }
 
@@ -523,9 +559,30 @@ public class FileSystemRepository implements ContentRepository {
             return false;
         }
 
+        return remove(claim.getResourceClaim());
+    }
+
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        // we synchronize on the queue here because if the claimant count is 0,
+        // we need to be able to remove any instance of that resource claim from the
+        // queue atomically (i.e., the checking of the claimant count plus removal from the queue
+        // must be atomic)
+        synchronized (writableClaimQueue) {
+            final int claimantCount = resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                // if other content claims are claiming the same resource, we have nothing to destroy,
+                // so just consider the destruction successful.
+                return true;
+            }
+        }
+
         Path path = null;
         try {
-            path = getPath(claim, false);
+            path = getPath(claim);
         } catch (final ContentNotFoundException cnfe) {
         }
 
@@ -538,6 +595,7 @@ public class FileSystemRepository implements ContentRepository {
         return true;
     }
 
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
         if (original == null) {
@@ -545,14 +603,11 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         final ContentClaim newClaim = create(lossTolerant);
-        final Path currPath = getPath(original, true);
-        final Path newPath = getPath(newClaim);
-        try (final FileOutputStream fos = new FileOutputStream(newPath.toFile())) {
-            Files.copy(currPath, fos);
-            if (alwaysSync) {
-                fos.getFD().sync();
-            }
+        try (final InputStream in = read(original);
+            final OutputStream out = write(newClaim)) {
+            StreamUtils.copy(in, out);
         } catch (final IOException ioe) {
+            decrementClaimantCount(newClaim);
             remove(newClaim);
             throw ioe;
         }
@@ -564,44 +619,28 @@ public class FileSystemRepository implements ContentRepository {
         if (claims.contains(destination)) {
             throw new IllegalArgumentException("destination cannot be within claims");
         }
-        try (final FileChannel dest = FileChannel.open(getPath(destination), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-            long position = 0L;
-            if (header != null && header.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(header);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
-                }
+
+        try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
+            if (header != null) {
+                out.write(header);
             }
-            int objectIndex = 0;
+
+            int i = 0;
             for (final ContentClaim claim : claims) {
-                long totalCopied = 0L;
-                try (final FileChannel src = FileChannel.open(getPath(claim, true), StandardOpenOption.READ)) {
-                    while (totalCopied < src.size()) {
-                        final long copiedThisIteration = dest.transferFrom(src, position, Long.MAX_VALUE);
-                        totalCopied += copiedThisIteration;
-                        position += copiedThisIteration;
-                    }
+                try (final InputStream in = read(claim)) {
+                    StreamUtils.copy(in, out);
                 }
-                // don't add demarcator after the last claim
-                if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) {
-                    final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
-                    while (buffer.hasRemaining()) {
-                        position += dest.write(buffer, position);
-                    }
-                }
-            }
-            if (footer != null && footer.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(footer);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
+
+                if (++i < claims.size() && demarcator != null) {
+                    out.write(demarcator);
                 }
             }
 
-            if (alwaysSync) {
-                dest.force(true);
+            if (footer != null) {
+                out.write(footer);
             }
 
-            return position;
+            return out.getBytesWritten();
         }
     }
 
@@ -624,12 +663,8 @@ public class FileSystemRepository implements ContentRepository {
 
     @Override
     public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException {
-        try (final FileOutputStream out = new FileOutputStream(getPath(claim).toFile(), append)) {
-            final long copied = StreamUtils.copy(content, out);
-            if (alwaysSync) {
-                out.getFD().sync();
-            }
-            return copied;
+        try (final OutputStream out = write(claim, append)) {
+            return StreamUtils.copy(content, out);
         }
     }
 
@@ -642,20 +677,14 @@ public class FileSystemRepository implements ContentRepository {
             Files.createFile(destination);
             return 0L;
         }
-        if (append) {
-            try (final FileChannel sourceChannel = FileChannel.open(getPath(claim, true), StandardOpenOption.READ);
-                    final FileChannel destinationChannel = FileChannel.open(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
-                long position = destinationChannel.size();
-                final long targetSize = position + sourceChannel.size();
-                while (position < targetSize) {
-                    final long bytesCopied = destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE);
-                    position += bytesCopied;
-                }
-                return position;
+
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+            final long copied = StreamUtils.copy(in, fos);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
-        } else {
-            Files.copy(getPath(claim, true), destination, StandardCopyOption.REPLACE_EXISTING);
-            return Files.size(destination);
+            return copied;
         }
     }
 
@@ -674,28 +703,20 @@ public class FileSystemRepository implements ContentRepository {
 
         final long claimSize = size(claim);
         if (offset > claimSize) {
-            throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize);
+            throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize);
 
         }
 
-        if (append) {
-            try (final InputStream sourceStream = Files.newInputStream(getPath(claim, true), StandardOpenOption.READ);
-                    final OutputStream destinationStream = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
-                StreamUtils.skip(sourceStream, offset);
-
-                final byte[] buffer = new byte[8192];
-                int len;
-                long copied = 0L;
-                while ((len = sourceStream.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) {
-                    destinationStream.write(buffer, 0, len);
-                    copied += len;
-                }
-                return copied;
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+            if (offset > 0) {
+                StreamUtils.skip(in, offset);
             }
-        } else {
-            try (final OutputStream out = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-                return exportTo(claim, out, offset, length);
+            StreamUtils.copy(in, fos, length);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
+            return length;
         }
     }
 
@@ -704,7 +725,10 @@ public class FileSystemRepository implements ContentRepository {
         if (claim == null) {
             return 0L;
         }
-        return Files.copy(getPath(claim, true), destination);
+
+        try (final InputStream in = read(claim)) {
+            return StreamUtils.copy(in, destination);
+        }
     }
 
     @Override
@@ -719,7 +743,7 @@ public class FileSystemRepository implements ContentRepository {
         if (offset == 0 && length == claimSize) {
             return exportTo(claim, destination);
         }
-        try (final InputStream in = Files.newInputStream(getPath(claim, true))) {
+        try (final InputStream in = read(claim)) {
             StreamUtils.skip(in, offset);
             final byte[] buffer = new byte[8192];
             int len;
@@ -738,7 +762,12 @@ public class FileSystemRepository implements ContentRepository {
             return 0L;
         }
 
-        return Files.size(getPath(claim, true));
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() < 0) {
+            return Files.size(getPath(claim, true));
+        }
+
+        return claim.getLength();
     }
 
     @Override
@@ -747,16 +776,198 @@ public class FileSystemRepository implements ContentRepository {
             return new ByteArrayInputStream(new byte[0]);
         }
         final Path path = getPath(claim, true);
-        return new FileInputStream(path.toFile());
+        final FileInputStream fis = new FileInputStream(path.toFile());
+        if (claim.getOffset() > 0L) {
+            StreamUtils.skip(fis, claim.getOffset());
+        }
+
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() >= 0) {
+            return new LimitedInputStream(fis, claim.getLength());
+        } else {
+            return fis;
+        }
     }
 
     @Override
-    @SuppressWarnings("resource")
     public OutputStream write(final ContentClaim claim) throws IOException {
-        final FileOutputStream fos = new FileOutputStream(getPath(claim).toFile());
-        return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos;
+        return write(claim, false);
     }
 
+    private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
+        if (claim == null) {
+            throw new NullPointerException("ContentClaim cannot be null");
+        }
+
+        if (!(claim instanceof StandardContentClaim)) {
+            // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything
+            // else, just throw an Exception because it is not valid for this Repository
+            throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository");
+        }
+
+        final StandardContentClaim scc = (StandardContentClaim) claim;
+
+        // we always append because there may be another ContentClaim using the same resource claim.
+        // However, we know that we will never write to the same claim from two different threads
+        // at the same time because we will call create() to get the claim before we write to it,
+        // and when we call create(), it will remove it from the Queue, which means that no other
+        // thread will get the same Claim until we've finished writing to it.
+        ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
+        final long initialLength;
+        if (claimStream == null) {
+            final File file = getPath(scc).toFile();
+            claimStream = new ByteCountingOutputStream(new FileOutputStream(file, true), file.length());
+            initialLength = 0L;
+        } else {
+            if (append) {
+                initialLength = Math.max(0, scc.getLength());
+            } else {
+                initialLength = 0;
+                scc.setOffset(claimStream.getBytesWritten());
+            }
+        }
+
+        final ByteCountingOutputStream bcos = claimStream;
+        final OutputStream out = new OutputStream() {
+            private long bytesWritten = 0L;
+            private boolean recycle = true;
+            private boolean closed = false;
+
+            @Override
+            public String toString() {
+                return "FileSystemRepository Stream [" + scc + "]";
+            }
+
+            @Override
+            public synchronized void write(final int b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten++;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += b.length;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b, off, len);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += len;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void flush() throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                bcos.flush();
+            }
+
+            @Override
+            public synchronized void close() throws IOException {
+                closed = true;
+
+                if (alwaysSync) {
+                    ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
+                }
+
+                if (scc.getLength() < 0) {
+                    // If claim was not written to, set length to 0
+                    scc.setLength(0L);
+                }
+
+                // if we've not yet hit the threshold for appending to a resource claim, add the claim
+                // to the writableClaimQueue so that the Resource Claim can be used again when create()
+                // is called. In this case, we don't have to actually close the file stream. Instead, we
+                // can just add it onto the queue and continue to use it for the next content claim.
+                final long resourceClaimLength = scc.getOffset() + scc.getLength();
+                if (recycle && resourceClaimLength < maxAppendClaimLength) {
+                    // we do not have to synchronize on the writable claim queue here because we
+                    // are only adding something to the queue. We must synchronize if we are
+                    // using a ResourceClaim from the queue and incrementing the claimant count on that resource
+                    // because those need to be done atomically, or if we are destroying a claim that is on
+                    // the queue because we need to ensure that the latter operation does not cause problems
+                    // with the former.
+                    final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
+                    final boolean enqueued;
+                    if (writableClaimQueue.contains(pair)) {
+                        // may already exist on the queue, if the content claim is written to multiple times.
+                        enqueued = true;
+                    } else {
+                        enqueued = writableClaimQueue.offer(pair);
+                    }
+
+                    if (enqueued) {
+                        writableClaimStreams.put(scc.getResourceClaim(), bcos);
+                        LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+                    } else {
+                        writableClaimStreams.remove(scc.getResourceClaim());
+                        bcos.close();
+
+                        LOG.debug("Claim length less than max; Closing {}", this);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+                        }
+                    }
+                } else {
+                    // we've reached the limit for this claim. Don't add it back to our queue.
+                    // Instead, just remove it and move on.
+                    writableClaimStreams.remove(scc.getResourceClaim());
+                    // ensure that the claim is no longer on the queue
+                    writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+                    bcos.close();
+                    LOG.debug("Claim lenth >= max; Closing {}", this);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+                    }
+                }
+            }
+        };
+
+        LOG.debug("Writing to {}", out);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out));
+        }
+
+        return out;
+    }
+
+
     @Override
     public void purge() {
         // delete all content from repositories
@@ -788,7 +999,7 @@ public class FileSystemRepository implements ContentRepository {
             }
         }
 
-        contentClaimManager.purge();
+        resourceClaimManager.purge();
     }
 
     private class BinDestructableClaims implements Runnable {
@@ -800,17 +1011,17 @@ public class FileSystemRepository implements ContentRepository {
                 // because the Container generally maps to a physical partition on the disk, so we want a few
                 // different threads hitting the different partitions but don't want multiple threads hitting
                 // the same partition.
-                final List<ContentClaim> toDestroy = new ArrayList<>();
+                final List<ResourceClaim> toDestroy = new ArrayList<>();
                 while (true) {
                     toDestroy.clear();
-                    contentClaimManager.drainDestructableClaims(toDestroy, 10000);
+                    resourceClaimManager.drainDestructableClaims(toDestroy, 10000);
                     if (toDestroy.isEmpty()) {
                         return;
                     }
 
-                    for (final ContentClaim claim : toDestroy) {
+                    for (final ResourceClaim claim : toDestroy) {
                         final String container = claim.getContainer();
-                        final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
+                        final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container);
 
                         try {
                             while (true) {
@@ -838,7 +1049,7 @@ public class FileSystemRepository implements ContentRepository {
         return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
     }
 
-    private Path getArchivePath(final ContentClaim claim) {
+    private Path getArchivePath(final ResourceClaim claim) {
         final String claimId = claim.getId();
         final Path containerPath = containers.get(claim.getContainer());
         final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
@@ -859,22 +1070,28 @@ public class FileSystemRepository implements ContentRepository {
             return true;
         }
 
-        return Files.exists(getArchivePath(contentClaim));
+        return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
     }
 
-    private void archive(final ContentClaim contentClaim) throws IOException {
+    private void archive(final ResourceClaim claim) throws IOException {
         if (!archiveData) {
             return;
         }
 
-        final int claimantCount = getClaimantCount(contentClaim);
-        if (claimantCount > 0) {
-            throw new IllegalStateException("Cannot archive ContentClaim " + contentClaim + " because it is currently in use");
+        synchronized (writableClaimQueue) {
+            final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                return;
+            }
+        }
+
+        final Path curPath = getPath(claim);
+        if (curPath == null) {
+            return;
         }
 
-        final Path curPath = getPath(contentClaim, true);
         archive(curPath);
-        LOG.debug("Successfully moved {} to archive", contentClaim);
+        LOG.debug("Successfully moved {} to archive", claim);
     }
 
     private void archive(final Path curPath) throws IOException {
@@ -1107,8 +1324,8 @@ public class FileSystemRepository implements ContentRepository {
                 while (true) {
                     // look through each of the binned queues of Content Claims
                     int successCount = 0;
-                    final List<ContentClaim> toRemove = new ArrayList<>();
-                    for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) {
+                    final List<ResourceClaim> toRemove = new ArrayList<>();
+                    for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) {
                         // drain the queue of all ContentClaims that can be destroyed for the given container.
                         final String container = entry.getKey();
                         final ContainerState containerState = containerStateMap.get(container);
@@ -1121,7 +1338,7 @@ public class FileSystemRepository implements ContentRepository {
 
                         // destroy each claim for this container
                         final long start = System.nanoTime();
-                        for (final ContentClaim claim : toRemove) {
+                        for (final ResourceClaim claim : toRemove) {
                             if (archiveData) {
                                 try {
                                     archive(claim);
@@ -1210,7 +1427,7 @@ public class FileSystemRepository implements ContentRepository {
         @Override
         public void run() {
             try {
-                if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
+                if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
                     final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
                     if (minRequiredSpace == null) {
                         return;
@@ -1245,7 +1462,7 @@ public class FileSystemRepository implements ContentRepository {
                 if (oldestContainerArchive < 0L) {
                     boolean updated;
                     do {
-                        long oldest = oldestArchiveDate.get();
+                        final long oldest = oldestArchiveDate.get();
                         if (oldestContainerArchive < oldest) {
                             updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
 
@@ -1298,7 +1515,7 @@ public class FileSystemRepository implements ContentRepository {
                     final long free = getContainerUsableSpace(containerName);
                     used = capacity - free;
                     bytesUsed = used;
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -1317,7 +1534,7 @@ public class FileSystemRepository implements ContentRepository {
                     try {
                         LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
                         condition.await();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                     }
                 }
             } finally {
@@ -1355,4 +1572,55 @@ public class FileSystemRepository implements ContentRepository {
         }
     }
 
+
+    private static class ClaimLengthPair {
+        private final ResourceClaim claim;
+        private final Long length;
+
+        public ClaimLengthPair(final ResourceClaim claim, final Long length) {
+            this.claim = claim;
+            this.length = length;
+        }
+
+        public ResourceClaim getClaim() {
+            return claim;
+        }
+
+        public Long getLength() {
+            return length;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (claim == null ? 0 : claim.hashCode());
+            return result;
+        }
+
+        /**
+         * Equality is determined purely by the ResourceClaim's equality
+         *
+         * @param obj the object to compare against
+         * @return -1, 0, or +1 according to the contract of Object.equals
+         */
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null) {
+                return false;
+            }
+
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            final ClaimLengthPair other = (ClaimLengthPair) obj;
+            return claim.equals(other.getClaim());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 6524cd3..cc8c734 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
 
     @Override
     public boolean isPenalized() {
-        return (penaltyExpirationMs > 0) ? penaltyExpirationMs > System.currentTimeMillis() : false;
+        return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
     }
 
     @Override
@@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
     public String toString() {
         final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
         builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
-        builder.append("claim", claim == null ? "" : claim.getId());
+        builder.append("claim", claim == null ? "" : claim.toString());
         builder.append("offset", claimOffset);
         builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
         return builder.toString();
@@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         private final Set<String> bLineageIdentifiers = new HashSet<>();
         private long bPenaltyExpirationMs = -1L;
         private long bSize = 0L;
-        private Map<String, String> bAttributes = new HashMap<>();
+        private final Map<String, String> bAttributes = new HashMap<>();
         private ContentClaim bClaim = null;
         private long bClaimOffset = 0L;
         private long bLastQueueDate = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 04e819e..c245424 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
 import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
@@ -54,10 +55,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.repository.io.LongHolder;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.QueueSize;
@@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,10 +90,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
-    private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
-        NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
-    private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
-
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
@@ -124,12 +118,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private final LongHolder bytesWritten = new LongHolder(0L);
     private int flowFilesIn = 0, flowFilesOut = 0;
     private long contentSizeIn = 0L, contentSizeOut = 0L;
-    private int writeRecursionLevel = 0;
-
-    private ContentClaim currentWriteClaim = null;
-    private OutputStream currentWriteClaimStream = null;
-    private long currentWriteClaimSize = 0L;
-    private int currentWriteClaimFlowFileCount = 0;
 
     private ContentClaim currentReadClaim = null;
     private ByteCountingInputStream currentReadClaimStream = null;
@@ -690,12 +678,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         if (originalClaim == null) {
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
+            final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
             builder.setCurrentContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
-                );
+                resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
             builder.setPreviousContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
-                );
+                resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
         }
     }
 
@@ -711,14 +700,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final ContentClaim currentClaim = repoRecord.getCurrentClaim();
             final long currentOffset = repoRecord.getCurrentClaimOffset();
             final long size = flowFile.getSize();
-            recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+            final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+            recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
         }
 
         if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
             final ContentClaim originalClaim = repoRecord.getOriginalClaim();
             final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
             final long originalSize = repoRecord.getOriginal().getSize();
-            recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+            final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+            recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
         }
 
         final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -741,14 +734,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 final ContentClaim currentClaim = repoRecord.getCurrentClaim();
                 final long currentOffset = repoRecord.getCurrentClaimOffset();
                 final long size = eventFlowFile.getSize();
-                recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+                final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+                recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
             }
 
             if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
                 final ContentClaim originalClaim = repoRecord.getOriginalClaim();
                 final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
                 final long originalSize = repoRecord.getOriginal().getSize();
-                recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+                final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+                recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
             }
 
             final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -1650,8 +1647,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                             final ContentClaim claim = record.getContentClaim();
                             if (claim != null) {
-                                enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
-                                enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+                                final ResourceClaim resourceClaim = claim.getResourceClaim();
+                                enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + claim.getOffset(), record.getSize());
+                                enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + claim.getOffset(), record.getSize());
                             }
 
                             enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
@@ -1695,7 +1695,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                             StreamUtils.skip(currentReadClaimStream, bytesToSkip);
                         }
 
-                        return new NonCloseableInputStream(currentReadClaimStream);
+                        return new DisableOnCloseInputStream(currentReadClaimStream);
                     }
                 }
 
@@ -1711,7 +1711,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                 // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
-                return new NonCloseableInputStream(currentReadClaimStream);
+                return new DisableOnCloseInputStream(currentReadClaimStream);
             } else {
                 final InputStream rawInStream = context.getContentRepository().read(claim);
                 StreamUtils.skip(rawInStream, offset);
@@ -1862,30 +1862,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return newFile;
     }
 
-    private void enforceCurrentWriteClaimState() {
-        if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) {
-            resetWriteClaims();
-        }
-
-        if (currentWriteClaimStream == null) {
-            try {
-                currentWriteClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} to enforce Current Write Claim State for {}", currentWriteClaim, context.getConnectable());
-            } catch (final IOException e) {
-                throw new FlowFileHandlingException("Unable to create ContentClaim due to " + e.toString(), e);
-            }
-
-            try {
-                currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
-            } catch (final IOException e) {
-                resetWriteClaims();
-                throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
-            }
-        } else {
-            context.getContentRepository().incrementClaimaintCount(currentWriteClaim);
-        }
-    }
-
     private void ensureNotAppending(final ContentClaim claim) throws IOException {
         if (claim == null) {
             return;
@@ -1905,78 +1881,38 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
         long newSize = 0L;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
 
         ContentClaim newClaim = null;
         final LongHolder writtenHolder = new LongHolder(0L);
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-
-                    recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
-                } finally {
-                    recursionSet.remove(source);
-                }
-
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += newSize;
-            } else {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
-
-                ensureNotAppending(newClaim);
-                try (final OutputStream stream = context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
-                    recursionSet.add(source);
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
 
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
-                } finally {
-                    recursionSet.remove(source);
-                }
-                newSize = context.getContentRepository().size(newClaim);
+            ensureNotAppending(newClaim);
+            try (final OutputStream stream = context.getContentRepository().write(newClaim);
+                final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+                final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                recursionSet.add(source);
+                writer.process(new FlowFileAccessOutputStream(countingOut, source));
+            } finally {
+                recursionSet.remove(source);
             }
+            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
         } catch (final FlowFileAccessException ffae) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
             throw ffae;
         } catch (final IOException ioe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
             throw t;
         } finally {
@@ -2021,13 +1957,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     // wrap our OutputStreams so that the processor cannot close it
                     try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
                         recursionSet.add(source);
-
-                        writeRecursionLevel++;
-                        try {
-                            writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
-                        } finally {
-                            writeRecursionLevel--;
-                        }
+                        writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
                     } finally {
                         recursionSet.remove(source);
                     }
@@ -2039,13 +1969,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 // wrap our OutputStreams so that the processor cannot close it
                 try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
                     recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(disableOnClose);
-                    } finally {
-                        writeRecursionLevel--;
-                    }
+                    writer.process(disableOnClose);
                 } finally {
                     recursionSet.remove(source);
                 }
@@ -2115,18 +2039,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void resetWriteClaims() {
-        try {
-            if (currentWriteClaimStream != null) {
-                currentWriteClaimStream.flush();
-                currentWriteClaimStream.close();
-            }
-        } catch (final Exception e) {
-        }
-        currentWriteClaimStream = null;
-        currentWriteClaim = null;
-        currentWriteClaimFlowFileCount = 0;
-        currentWriteClaimSize = 0L;
-
         for (final ByteCountingOutputStream out : appendableStreams.values()) {
             try {
                 out.flush();
@@ -2148,15 +2060,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         currentReadClaim = null;
     }
 
-    /**
-     * @return Indicates whether or not multiple FlowFiles should be merged into a single ContentClaim
-     */
-    private boolean isMergeContent() {
-        if (writeRecursionLevel > 0) {
-            return false;
-        }
-        return true;
-    }
 
     @Override
     public FlowFile write(final FlowFile source, final StreamCallback writer) {
@@ -2166,116 +2069,58 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         ContentClaim newClaim = null;
         long newSize = 0L;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
         final LongHolder writtenHolder = new LongHolder(0L);
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-                    final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
-                    final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                    final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
 
-                    recursionSet.add(source);
+            ensureNotAppending(newClaim);
 
-                    // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
+            try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
+                final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
+                final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+                final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                final OutputStream os = context.getContentRepository().write(newClaim);
+                final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+                final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
-                        // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
-                            throw ffais.getContentNotFoundException();
-                        }
-                    }
-                }
-
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += writtenThisCall;
-            } else {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
-
-                ensureNotAppending(newClaim);
-
-                try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
-                    final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
-                    final OutputStream os = context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
+                recursionSet.add(source);
 
-                    recursionSet.add(source);
+                // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
+                // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
+                // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
+                // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
+                // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
+                final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
+                boolean cnfeThrown = false;
 
-                    // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
+                try {
+                    writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
+                } catch (final ContentNotFoundException cnfe) {
+                    cnfeThrown = true;
+                    throw cnfe;
+                } finally {
+                    recursionSet.remove(source);
 
-                        // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
-                            throw ffais.getContentNotFoundException();
-                        }
+                    // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
+                    if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
+                        throw ffais.getContentNotFoundException();
                     }
                 }
-
-                newSize = context.getContentRepository().size(newClaim);
             }
+
+            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final FlowFileAccessException ffae) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw ffae;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can remove the claim
-            }
             destroyContent(newClaim);
             throw t;
         } finally {
@@ -2302,33 +2147,20 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final ContentClaim newClaim;
         final long claimOffset;
 
-        final boolean appendToClaim = isMergeContent();
-        if (appendToClaim) {
-            enforceCurrentWriteClaimState();
-            newClaim = currentWriteClaim;
-            claimOffset = currentWriteClaimSize;
-        } else {
-            try {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
-            }
-
-            claimOffset = 0L;
+        try {
+            newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+        } catch (final IOException e) {
+            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
         }
 
+        claimOffset = 0L;
         long newSize = 0L;
         try {
-            final boolean append = isMergeContent();
-            newSize = context.getContentRepository().importFrom(source, newClaim, append);
+            newSize = context.getContentRepository().importFrom(source, newClaim, false);
             bytesWritten.increment(newSize);
             bytesRead.increment(newSize);
-            currentWriteClaimSize += newSize;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
         }
@@ -2351,40 +2183,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
         ContentClaim newClaim = null;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
 
         final long newSize;
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                newClaim = currentWriteClaim;
-                claimOffset = currentWriteClaimSize;
-
-                final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
-                bytesWritten.increment(bytesCopied);
-                currentWriteClaimSize += bytesCopied;
-                newSize = bytesCopied;
-            } else {
-                try {
-                    newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                    claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+            try {
+                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
 
-                    newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
-                    bytesWritten.increment(newSize);
-                    currentWriteClaimSize += newSize;
-                } catch (final IOException e) {
-                    throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
-                }
+                newSize = context.getContentRepository().importFrom(source, newClaim, false);
+                bytesWritten.increment(newSize);
+            } catch (final IOException e) {
+                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
             }
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
-
             if (newClaim != null) {
                 destroyContent(newClaim);
             }
+            
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..6c1626c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
 import org.apache.nifi.controller.repository.io.MemoryManager;
@@ -92,7 +95,7 @@ public class VolatileContentRepository implements ContentRepository {
     private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
     private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     public VolatileContentRepository() {
         this(NiFiProperties.getInstance());
@@ -119,7 +122,7 @@ public class VolatileContentRepository implements ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
 
         for (int i = 0; i < 3; i++) {
@@ -199,9 +202,10 @@ public class VolatileContentRepository implements ContentRepository {
 
     private ContentClaim createLossTolerant() {
         final long id = idGenerator.getAndIncrement();
-        final ContentClaim claim = claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+        final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
         final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
-        claimManager.incrementClaimantCount(claim, true);
+        claimManager.incrementClaimantCount(resourceClaim, true);
 
         claimMap.put(claim, contentBlock);
 
@@ -216,7 +220,7 @@ public class VolatileContentRepository implements ContentRepository {
         }
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.incrementClaimantCount(resolveClaim(claim));
+            return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().incrementClaimaintCount(backupClaim);
         }
@@ -230,7 +234,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.decrementClaimantCount(resolveClaim(claim));
+            return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().decrementClaimantCount(backupClaim);
         }
@@ -244,7 +248,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.getClaimantCount(resolveClaim(claim));
+            return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().getClaimantCount(backupClaim);
         }
@@ -273,6 +277,29 @@ public class VolatileContentRepository implements ContentRepository {
         return true;
     }
 
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        final Set<ContentClaim> contentClaims = new HashSet<>();
+        for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) {
+            final ContentClaim contentClaim = entry.getKey();
+            if (contentClaim.getResourceClaim().equals(claim)) {
+                contentClaims.add(contentClaim);
+            }
+        }
+
+        boolean removed = false;
+        for (final ContentClaim contentClaim : contentClaims) {
+            if (remove(contentClaim)) {
+                removed = true;
+            }
+        }
+
+        return removed;
+    }
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
         final ContentClaim createdClaim = create(lossTolerant);
@@ -435,7 +462,7 @@ public class VolatileContentRepository implements ContentRepository {
     @Override
     public void purge() {
         for (final ContentClaim claim : claimMap.keySet()) {
-            claimManager.decrementClaimantCount(resolveClaim(claim));
+            claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
             final ContentClaim backup = getBackupClaim(claim);
             if (backup != null) {
                 getBackupRepository().remove(backup);
@@ -624,7 +651,7 @@ public class VolatileContentRepository implements ContentRepository {
 
         @Override
         public void run() {
-            final List<ContentClaim> destructable = new ArrayList<>(1000);
+            final List<ResourceClaim> destructable = new ArrayList<>(1000);
             while (true) {
                 destructable.clear();
                 claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS);
@@ -632,7 +659,7 @@ public class VolatileContentRepository implements ContentRepository {
                     return;
                 }
 
-                for (final ContentClaim claim : destructable) {
+                for (final ResourceClaim claim : destructable) {
                     remove(claim);
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index fe34fe0..a85b23b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * <p>
@@ -32,10 +34,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 public class VolatileFlowFileRepository implements FlowFileRepository {
 
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
     }
 
@@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
     public void close() throws IOException {
     }
 
+    private void markDestructable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        if (resourceClaim == null) {
+            return;
+        }
+
+        claimManager.markDestructable(resourceClaim);
+    }
+
+    private int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        if (resourceClaim == null) {
+            return 0;
+        }
+
+        return claimManager.getClaimantCount(resourceClaim);
+    }
+
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
-                    claimManager.markDestructable(record.getCurrentClaim());
+                if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
+                    markDestructable(record.getCurrentClaim());
                 }
 
                 // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, mark original as destructable
-                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             }
         }