You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 23:02:20 UTC

[3/4] nifi git commit: NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed

NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9f32074c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9f32074c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9f32074c

Branch: refs/heads/NIFI-744
Commit: 9f32074cc692f803f299722fee2289f27d339237
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 15:59:25 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 29 16:30:07 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           |   4 +-
 .../repository/FlowFileRepository.java          |   4 +-
 .../repository/FlowFileSwapManager.java         |   6 +-
 .../repository/claim/ContentClaim.java          |  31 +-
 .../repository/claim/ContentClaimManager.java   | 142 -----
 .../repository/claim/ResourceClaim.java         |  54 ++
 .../repository/claim/ResourceClaimManager.java  | 135 +++++
 .../stream/io/ByteCountingOutputStream.java     |   4 +
 .../nifi-framework/nifi-framework-core/pom.xml  |   1 +
 .../nifi/controller/FileSystemSwapManager.java  |  63 ++-
 .../apache/nifi/controller/FlowController.java  | 168 +++---
 .../repository/FileSystemRepository.java        | 566 ++++++++++++++-----
 .../repository/StandardFlowFileRecord.java      |   6 +-
 .../repository/StandardProcessSession.java      | 368 +++---------
 .../repository/VolatileContentRepository.java   |  49 +-
 .../repository/VolatileFlowFileRepository.java  |  46 +-
 .../WriteAheadFlowFileRepository.java           | 101 +++-
 .../repository/claim/StandardContentClaim.java  | 133 ++---
 .../claim/StandardContentClaimManager.java      | 145 -----
 .../repository/claim/StandardResourceClaim.java | 134 +++++
 .../claim/StandardResourceClaimManager.java     | 145 +++++
 .../controller/TestFileSystemSwapManager.java   |  24 +-
 .../repository/TestFileSystemRepository.java    | 192 ++++++-
 .../repository/TestStandardProcessSession.java  | 378 ++++---------
 .../TestVolatileContentRepository.java          |  17 +-
 .../TestWriteAheadFlowFileRepository.java       |   6 +-
 .../src/test/resources/bye.txt                  |   1 +
 27 files changed, 1648 insertions(+), 1275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index ee3ead9..da87d75 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Defines the capabilities of a content repository. Append options are not
@@ -42,7 +42,7 @@ public interface ContentRepository {
      * @param claimManager to handle claims
      * @throws java.io.IOException if unable to init
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * Shuts down the Content Repository, freeing any resources that may be

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 5e59e04..58fc6b3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Implementations must be thread safe
@@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable {
      * @param claimManager for handling claims
      * @throws java.io.IOException if unable to initialize repository
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * @return the maximum number of bytes that can be stored in the underlying

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 869e2b3..2e5be11 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 
 /**
@@ -38,7 +38,7 @@ public interface FlowFileSwapManager {
      * @param reporter the EventReporter that can be used for notifying users of
      * important events
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
+    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
 
     /**
      * Shuts down the manager
@@ -59,5 +59,5 @@ public interface FlowFileSwapManager {
      * @param claimManager manager
      * @return how many flowfiles have been recovered
      */
-    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager);
+    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 53cc44f..5c1d76b 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim;
 
 /**
  * <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow
- * files may reference the same content by both having the same content
- * claim.</p>
+ * A reference to a section of a {@link ResourceClaim}, which may or may not encompass
+ * the entire ResourceClaim. Multiple FlowFiles may reference the same content by both
+ * having the same content claim.
+ * </p>
  *
  * <p>
- * Must be thread safe</p>
- *
+ * Must be thread safe
+ * </p>
  */
 public interface ContentClaim extends Comparable<ContentClaim> {
 
     /**
-     * @return the unique identifier for this claim
-     */
-    String getId();
-
-    /**
-     * @return the container identifier in which this claim is held
+     * @return the ResourceClaim that this ContentClaim references
      */
-    String getContainer();
+    ResourceClaim getResourceClaim();
 
     /**
-     * @return the section within a given container the claim is held
+     * @return the offset into the ResourceClaim where the content for this
+     * claim begins
      */
-    String getSection();
+    long getOffset();
 
     /**
-     * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
-     * attempt to keep the content but will not sacrifice a great deal of
-     * performance to do so
+     * @return the length of this ContentClaim
      */
-    boolean isLossTolerant();
+    long getLength();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
deleted file mode 100644
index bffcec3..0000000
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Responsible for managing all ContentClaims that are used in the application
- */
-public interface ContentClaimManager {
-
-    /**
-     * Creates a new Content Claim with the given id, container, section, and
-     * loss tolerance.
-     *
-     * @param id of claim
-     * @param container of claim
-     * @param section of claim
-     * @param lossTolerant of claim
-     * @return new claim
-     */
-    ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant);
-
-    /**
-     * @param claim to obtain reference count for
-     * @return the number of FlowFiles that hold a claim to a particular piece
-     * of FlowFile content
-     */
-    int getClaimantCount(ContentClaim claim);
-
-    /**
-     * Decreases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to decrement claimants on
-     * @return new claimaint count
-     */
-    int decrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to increment claims on
-     * @return new claimant count
-     */
-    int incrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count.
-     *
-     * If it is known that the Content Claim whose count is being incremented is
-     * a newly created ContentClaim, this method should be called with a value
-     * of {@code true} as the second argument, as it may allow the manager to
-     * optimize its tasks, knowing that the Content Claim cannot be referenced
-     * by any other component
-     *
-     * @param claim to increment
-     * @param newClaim provides a hint that no other process can have access to this
-     * claim right now
-     * @return new claim count
-     */
-    int incrementClaimantCount(ContentClaim claim, boolean newClaim);
-
-    /**
-     * Indicates that the given ContentClaim can now be destroyed by the
-     * appropriate Content Repository. This should be done only after it is
-     * guaranteed that the FlowFile Repository has been synchronized with its
-     * underlying storage component. This way, we avoid the following sequence
-     * of events:
-     * <ul>
-     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
-     * depends on ContentClaim C</li>
-     * <li>ContentClaim C is no longer needed and is destroyed</li>
-     * <li>The Operating System crashes or there is a power failure</li>
-     * <li>Upon restart, the FlowFile Repository was not synchronized with its
-     * underlying storage mechanism and as such indicates that FlowFile F needs
-     * ContentClaim C.</li>
-     * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
-     * and FlowFile F's Content is not found, so the FlowFile is removed,
-     * resulting in data loss.</li>
-     * </ul>
-     *
-     * <p>
-     * Using this method of marking the ContentClaim as destructable only when
-     * the FlowFile repository has been synced with the underlying storage
-     * mechanism, we can ensure that on restart, we will not point to this
-     * unneeded claim. As such, it is now safe to destroy the contents.
-     * </p>
-     *
-     * @param claim to mark as now destructable
-     */
-    void markDestructable(ContentClaim claim);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed. If no ContentClaim is ready to be destroyed at
-     * this time, will wait up to the specified amount of time before returning.
-     * If, after the specified amount of time, there is still no ContentClaim
-     * ready to be destroyed, the method will return without having added
-     * anything to the given {@code destination}.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     * @param timeout maximum time to wait
-     * @param unit unit of time to wait
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit);
-
-    /**
-     * Clears the manager's memory of any and all ContentClaims that it knows
-     * about
-     */
-    void purge();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
new file mode 100644
index 0000000..d448632
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+/**
+ * <p>
+ * Represents a resource that can be provided by a {@link ContentRepository}
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE!
+ * </p>
+ */
+public interface ResourceClaim extends Comparable<ResourceClaim> {
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    String getId();
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    String getContainer();
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    String getSection();
+
+    /**
+     * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
+     *         attempt to keep the content but will not sacrifice a great deal of
+     *         performance to do so
+     */
+    boolean isLossTolerant();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
new file mode 100644
index 0000000..01f4c65
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ResourceClaims that are used in the application
+ */
+public interface ResourceClaimManager {
+
+    /**
+     * Creates a new Resource Claim with the given id, container, section, and
+     * loss tolerance.
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @param lossTolerant of claim
+     * @return new claim
+     */
+    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+
+    /**
+     * @param claim to obtain reference count for
+     * @return the number of FlowFiles that hold a claim to a particular piece
+     * of FlowFile content
+     */
+    int getClaimantCount(ResourceClaim claim);
+
+    /**
+     * Decreases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to decrement claimants on
+     * @return new claimaint count
+     */
+    int decrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to increment claims on
+     * @return new claimant count
+     */
+    int incrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count.
+     *
+     * If it is known that the Content Claim whose count is being incremented is
+     * a newly created ResourceClaim, this method should be called with a value
+     * of {@code true} as the second argument, as it may allow the manager to
+     * optimize its tasks, knowing that the Content Claim cannot be referenced
+     * by any other component
+     *
+     * @param claim to increment
+     * @param newClaim provides a hint that no other process can have access to this
+     *            claim right now
+     * @return new claim count
+     */
+    int incrementClaimantCount(ResourceClaim claim, boolean newClaim);
+
+    /**
+     * Indicates that the given ResourceClaim can now be destroyed by the
+     * appropriate Content Repository. This should be done only after it is
+     * guaranteed that the FlowFile Repository has been synchronized with its
+     * underlying storage component. This way, we avoid the following sequence
+     * of events:
+     * <ul>
+     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer depends on ResourceClaim C</li>
+     * <li>ResourceClaim C is no longer needed and is destroyed</li>
+     * <li>The Operating System crashes or there is a power failure</li>
+     * <li>Upon restart, the FlowFile Repository was not synchronized with its underlying storage mechanism and as such indicates that FlowFile F needs ResourceClaim C.</li>
+     * <li>Since ResourceClaim C has already been destroyed, it is inaccessible, and FlowFile F's Content is not found, so the FlowFile is removed, resulting in data loss.</li>
+     * </ul>
+     *
+     * <p>
+     * Using this method of marking the ResourceClaim as destructable only when the FlowFile repository has been synced with the underlying storage mechanism, we can ensure that on restart, we will
+     * not point to this unneeded claim. As such, it is now safe to destroy the contents.
+     * </p>
+     *
+     * @param claim to mark as now destructable
+     */
+    void markDestructable(ResourceClaim claim);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed. If no ResourceClaim is ready to be destroyed at
+     * this time, will wait up to the specified amount of time before returning.
+     * If, after the specified amount of time, there is still no ResourceClaim
+     * ready to be destroyed, the method will return without having added
+     * anything to the given {@code destination}.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     * @param timeout maximum time to wait
+     * @param unit unit of time to wait
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);
+
+    /**
+     * Clears the manager's memory of any and all ResourceClaims that it knows
+     * about
+     */
+    void purge();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 9bbd45e..47f236d 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream {
     public void close() throws IOException {
         out.close();
     }
+
+    public OutputStream getWrappedStream() {
+        return out;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 8d64143..f48988a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -135,6 +135,7 @@
                         <exclude>src/test/resources/conf/0bytes.xml</exclude>
                         <exclude>src/test/resources/conf/termination-only.xml</exclude>
                         <exclude>src/test/resources/hello.txt</exclude>
+                        <exclude>src/test/resources/bye.txt</exclude>
                         <exclude>src/test/resources/old-swap-file.swap</exclude>
                     </excludes>
                 </configuration>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 604dba9..c829566 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,7 +61,9 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.QueueProvider;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -83,7 +85,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 6;
+    public static final int SWAP_ENCODING_VERSION = 7;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
 
     private final ScheduledExecutorService swapQueueIdentifierExecutor;
@@ -98,7 +100,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private final long swapOutMillis;
     private final int swapOutThreadCount;
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
 
@@ -138,7 +140,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     @Override
-    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
+    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) {
         this.claimManager = claimManager;
         this.flowFileRepository = flowFileRepository;
         this.eventReporter = eventReporter;
@@ -184,11 +186,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     out.writeBoolean(false);
                 } else {
                     out.writeBoolean(true);
-                    out.writeUTF(claim.getId());
-                    out.writeUTF(claim.getContainer());
-                    out.writeUTF(claim.getSection());
+                    final ResourceClaim resourceClaim = claim.getResourceClaim();
+                    out.writeUTF(resourceClaim.getId());
+                    out.writeUTF(resourceClaim.getContainer());
+                    out.writeUTF(resourceClaim.getSection());
+                    out.writeLong(claim.getOffset());
+                    out.writeLong(claim.getLength());
                     out.writeLong(flowFile.getContentClaimOffset());
-                    out.writeBoolean(claim.isLossTolerant());
+                    out.writeBoolean(resourceClaim.isLossTolerant());
                 }
 
                 final Map<String, String> attributes = flowFile.getAttributes();
@@ -226,7 +231,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
@@ -245,7 +250,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue,
-            final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
+            final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
             // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
@@ -292,6 +297,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
                 final String container = in.readUTF();
                 final String section = in.readUTF();
+
+                final long resourceOffset;
+                final long resourceLength;
+                if (serializationVersion < 6) {
+                    resourceOffset = 0L;
+                    resourceLength = -1L;
+                } else {
+                    resourceOffset = in.readLong();
+                    resourceLength = in.readLong();
+                }
+
                 final long claimOffset = in.readLong();
 
                 final boolean lossTolerant;
@@ -301,10 +317,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     lossTolerant = false;
                 }
 
-                final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
+                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+                claim.setLength(resourceLength);
 
                 if (incrementContentClaims) {
-                    claimManager.incrementClaimantCount(claim);
+                    claimManager.incrementClaimantCount(resourceClaim);
                 }
 
                 ffBuilder.contentClaim(claim);
@@ -353,16 +371,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             throw new EOFException();
         }
         if (firstValue == 0xff && secondValue == 0xff) {
-            int ch1 = in.read();
-            int ch2 = in.read();
-            int ch3 = in.read();
-            int ch4 = in.read();
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
             if ((ch1 | ch2 | ch3 | ch4) < 0) {
                 throw new EOFException();
             }
-            return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
         } else {
-            return ((firstValue << 8) + (secondValue));
+            return (firstValue << 8) + secondValue;
         }
     }
 
@@ -422,7 +440,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                 final FlowFileQueue flowFileQueue = entry.getKey();
 
                 // if queue is more than 60% of its swap threshold, don't swap flowfiles in
-                if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
+                if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) {
                     continue;
                 }
 
@@ -432,7 +450,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         final Queue<File> queue = queueLockWrapper.getQueue();
 
                         // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
+                        while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) {
                             File swapFile = null;
                             try {
                                 swapFile = queue.poll();
@@ -545,7 +563,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
                         if (swapQueue == null) {
                             swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                            QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
+                            final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
                             if (oldQueue != null) {
                                 swapQueue = oldQueue;
                             }
@@ -567,7 +585,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
      * @return the largest FlowFile ID that was recovered
      */
     @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
+    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
@@ -680,6 +698,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
+    @Override
     public void shutdown() {
         swapQueueIdentifierExecutor.shutdownNow();
         swapInExecutor.shutdownNow();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d78b3a..af99d50 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -98,9 +98,12 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
@@ -176,7 +179,6 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
@@ -282,7 +284,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
-    private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
+    private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
 
     // guarded by rwLock
     /**
@@ -495,7 +497,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
     }
 
-    private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+    private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
@@ -3108,7 +3110,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isInputAvailable() {
                 try {
-                    return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
+                    return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+                        event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
@@ -3117,43 +3120,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isOutputAvailable() {
                 try {
-                    return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
+                    return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(),
+                        event.getContentClaimIdentifier(), event.getContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
             }
 
-            private ContentClaim createClaim(final String container, final String section, final String identifier) {
+            private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) {
                 if (container == null || section == null || identifier == null) {
                     return null;
                 }
 
-                return new ContentClaim() {
-                    @Override
-                    public int compareTo(final ContentClaim o) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return identifier;
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return container;
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return section;
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return false;
-                    }
-                };
+                final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false);
+                return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
             }
 
             @Override
@@ -3170,45 +3150,48 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         requireNonNull(requestUri);
 
         final ContentClaim claim;
-        final Long offset;
         final long size;
+        final long offset;
         if (direction == ContentDirection.INPUT) {
             if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Input Content Claim not specified");
             }
 
-            claim = contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false);
-            offset = provEvent.getPreviousContentClaimOffset();
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+                provEvent.getPreviousContentClaimIdentifier(), false);
+            claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
+            offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
         } else {
             if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Output Content Claim not specified");
             }
 
-            claim = contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false);
-            offset = provEvent.getContentClaimOffset();
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+                provEvent.getContentClaimIdentifier(), false);
+
+            claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
+            offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
             size = provEvent.getFileSize();
         }
 
         final InputStream rawStream = contentRepository.read(claim);
-        if (offset != null) {
-            StreamUtils.skip(rawStream, offset.longValue());
-        }
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.SEND)
-                .setFlowFileUUID(provEvent.getFlowFileUuid())
-                .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap())
-                .setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), offset, size)
-                .setTransitUri(requestUri)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
-                .setLineageStartDate(provEvent.getLineageStartDate())
-                .setComponentType(getName())
-                .setComponentId(getRootGroupId())
-                .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
-                .build();
+            .setEventType(ProvenanceEventType.SEND)
+            .setFlowFileUUID(provEvent.getFlowFileUuid())
+            .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
+            .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
+            .setTransitUri(requestUri)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
+            .setLineageStartDate(provEvent.getLineageStartDate())
+            .setComponentType(getName())
+            .setComponentId(getRootGroupId())
+            .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
+            .build();
 
         provenanceEventRepository.registerEvent(sendEvent);
 
@@ -3233,7 +3216,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         try {
-            if (!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer, contentClaimSection, contentClaimId, false))) {
+            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+            final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
+
+            if (!contentRepository.isAccessible(contentClaim)) {
                 return "Content is no longer available in Content Repository";
             }
         } catch (final IOException ioe) {
@@ -3310,18 +3296,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Create the ContentClaim
-        final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+        final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
                 event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
-        contentClaimManager.incrementClaimantCount(claim);
+        contentClaimManager.incrementClaimantCount(resourceClaim);
+        final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
+        final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
+        contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
 
-        if (!contentRepository.isAccessible(claim)) {
-            contentClaimManager.decrementClaimantCount(claim);
+        if (!contentRepository.isAccessible(contentClaim)) {
+            contentClaimManager.decrementClaimantCount(resourceClaim);
             throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
         }
 
-        final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
         final String parentUUID = event.getFlowFileUuid();
 
         // Create the FlowFile Record
@@ -3331,39 +3319,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final String newFlowFileUUID = UUID.randomUUID().toString();
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                // Copy relevant info from source FlowFile
-                .addAttributes(event.getPreviousAttributes())
-                .contentClaim(claim)
-                .contentClaimOffset(claimOffset)
-                .entryDate(System.currentTimeMillis())
-                .id(flowFileRepository.getNextFlowFileSequence())
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(event.getLineageStartDate())
-                .size(contentSize.longValue())
-                // Create a new UUID and add attributes indicating that this is a replay
-                .addAttribute("flowfile.replay", "true")
-                .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
-                .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
-                // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
-                .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
-                // build the record
-                .build();
+            // Copy relevant info from source FlowFile
+            .addAttributes(event.getPreviousAttributes())
+            .contentClaim(contentClaim)
+            .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
+            .entryDate(System.currentTimeMillis())
+            .id(flowFileRepository.getNextFlowFileSequence())
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(event.getLineageStartDate())
+            .size(contentSize.longValue())
+            // Create a new UUID and add attributes indicating that this is a replay
+            .addAttribute("flowfile.replay", "true")
+            .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
+            .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
+            // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
+            .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
+            // build the record
+            .build();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.REPLAY)
-                .addChildUuid(newFlowFileUUID)
-                .addParentUuid(parentUUID)
-                .setFlowFileUUID(parentUUID)
-                .setAttributes(Collections.<String, String>emptyMap(), flowFileRecord.getAttributes())
-                .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
-                .setDetails("Replay requested by " + requestor)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(System.currentTimeMillis())
-                .setLineageStartDate(event.getLineageStartDate())
-                .setComponentType(event.getComponentType())
-                .setComponentId(event.getComponentId())
-                .build();
+            .setEventType(ProvenanceEventType.REPLAY)
+            .addChildUuid(newFlowFileUUID)
+            .addParentUuid(parentUUID)
+            .setFlowFileUUID(parentUUID)
+            .setAttributes(Collections.<String, String> emptyMap(), flowFileRecord.getAttributes())
+            .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
+            .setDetails("Replay requested by " + requestor)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setLineageStartDate(event.getLineageStartDate())
+            .setComponentType(event.getComponentType())
+            .setComponentId(event.getComponentId())
+            .build();
         provenanceEventRepository.registerEvent(replayEvent);
 
         // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow