You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 23:02:20 UTC
[3/4] nifi git commit: NIFI-744: Refactored ContentClaim into
ContentClaim and ResourceClaim so that we can append to a single file in the
FileSystemRepository even after a session is completed
NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9f32074c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9f32074c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9f32074c
Branch: refs/heads/NIFI-744
Commit: 9f32074cc692f803f299722fee2289f27d339237
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 15:59:25 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 29 16:30:07 2015 -0400
----------------------------------------------------------------------
.../repository/ContentRepository.java | 4 +-
.../repository/FlowFileRepository.java | 4 +-
.../repository/FlowFileSwapManager.java | 6 +-
.../repository/claim/ContentClaim.java | 31 +-
.../repository/claim/ContentClaimManager.java | 142 -----
.../repository/claim/ResourceClaim.java | 54 ++
.../repository/claim/ResourceClaimManager.java | 135 +++++
.../stream/io/ByteCountingOutputStream.java | 4 +
.../nifi-framework/nifi-framework-core/pom.xml | 1 +
.../nifi/controller/FileSystemSwapManager.java | 63 ++-
.../apache/nifi/controller/FlowController.java | 168 +++---
.../repository/FileSystemRepository.java | 566 ++++++++++++++-----
.../repository/StandardFlowFileRecord.java | 6 +-
.../repository/StandardProcessSession.java | 368 +++---------
.../repository/VolatileContentRepository.java | 49 +-
.../repository/VolatileFlowFileRepository.java | 46 +-
.../WriteAheadFlowFileRepository.java | 101 +++-
.../repository/claim/StandardContentClaim.java | 133 ++---
.../claim/StandardContentClaimManager.java | 145 -----
.../repository/claim/StandardResourceClaim.java | 134 +++++
.../claim/StandardResourceClaimManager.java | 145 +++++
.../controller/TestFileSystemSwapManager.java | 24 +-
.../repository/TestFileSystemRepository.java | 192 ++++++-
.../repository/TestStandardProcessSession.java | 378 ++++---------
.../TestVolatileContentRepository.java | 17 +-
.../TestWriteAheadFlowFileRepository.java | 6 +-
.../src/test/resources/bye.txt | 1 +
27 files changed, 1648 insertions(+), 1275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index ee3ead9..da87d75 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.Set;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* Defines the capabilities of a content repository. Append options are not
@@ -42,7 +42,7 @@ public interface ContentRepository {
* @param claimManager to handle claims
* @throws java.io.IOException if unable to init
*/
- void initialize(ContentClaimManager claimManager) throws IOException;
+ void initialize(ResourceClaimManager claimManager) throws IOException;
/**
* Shuts down the Content Repository, freeing any resources that may be
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 5e59e04..58fc6b3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* Implementations must be thread safe
@@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable {
* @param claimManager for handling claims
* @throws java.io.IOException if unable to initialize repository
*/
- void initialize(ContentClaimManager claimManager) throws IOException;
+ void initialize(ResourceClaimManager claimManager) throws IOException;
/**
* @return the maximum number of bytes that can be stored in the underlying
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 869e2b3..2e5be11 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
/**
@@ -38,7 +38,7 @@ public interface FlowFileSwapManager {
* @param reporter the EventReporter that can be used for notifying users of
* important events
*/
- void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
+ void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
/**
* Shuts down the manager
@@ -59,5 +59,5 @@ public interface FlowFileSwapManager {
* @param claimManager manager
* @return how many flowfiles have been recovered
*/
- long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager);
+ long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 53cc44f..5c1d76b 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim;
/**
* <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow
- * files may reference the same content by both having the same content
- * claim.</p>
+ * A reference to a section of a {@link ResourceClaim}, which may or may not encompass
+ * the entire ResourceClaim. Multiple FlowFiles may reference the same content by both
+ * having the same content claim.
+ * </p>
*
* <p>
- * Must be thread safe</p>
- *
+ * Must be thread safe
+ * </p>
*/
public interface ContentClaim extends Comparable<ContentClaim> {
/**
- * @return the unique identifier for this claim
- */
- String getId();
-
- /**
- * @return the container identifier in which this claim is held
+ * @return the ResourceClaim that this ContentClaim references
*/
- String getContainer();
+ ResourceClaim getResourceClaim();
/**
- * @return the section within a given container the claim is held
+ * @return the offset into the ResourceClaim where the content for this
+ * claim begins
*/
- String getSection();
+ long getOffset();
/**
- * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
- * attempt to keep the content but will not sacrifice a great deal of
- * performance to do so
+ * @return the length of this ContentClaim
*/
- boolean isLossTolerant();
+ long getLength();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
deleted file mode 100644
index bffcec3..0000000
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Responsible for managing all ContentClaims that are used in the application
- */
-public interface ContentClaimManager {
-
- /**
- * Creates a new Content Claim with the given id, container, section, and
- * loss tolerance.
- *
- * @param id of claim
- * @param container of claim
- * @param section of claim
- * @param lossTolerant of claim
- * @return new claim
- */
- ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant);
-
- /**
- * @param claim to obtain reference count for
- * @return the number of FlowFiles that hold a claim to a particular piece
- * of FlowFile content
- */
- int getClaimantCount(ContentClaim claim);
-
- /**
- * Decreases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count
- *
- * @param claim to decrement claimants on
- * @return new claimaint count
- */
- int decrementClaimantCount(ContentClaim claim);
-
- /**
- * Increases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count
- *
- * @param claim to increment claims on
- * @return new claimant count
- */
- int incrementClaimantCount(ContentClaim claim);
-
- /**
- * Increases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count.
- *
- * If it is known that the Content Claim whose count is being incremented is
- * a newly created ContentClaim, this method should be called with a value
- * of {@code true} as the second argument, as it may allow the manager to
- * optimize its tasks, knowing that the Content Claim cannot be referenced
- * by any other component
- *
- * @param claim to increment
- * @param newClaim provides a hint that no other process can have access to this
- * claim right now
- * @return new claim count
- */
- int incrementClaimantCount(ContentClaim claim, boolean newClaim);
-
- /**
- * Indicates that the given ContentClaim can now be destroyed by the
- * appropriate Content Repository. This should be done only after it is
- * guaranteed that the FlowFile Repository has been synchronized with its
- * underlying storage component. This way, we avoid the following sequence
- * of events:
- * <ul>
- * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
- * depends on ContentClaim C</li>
- * <li>ContentClaim C is no longer needed and is destroyed</li>
- * <li>The Operating System crashes or there is a power failure</li>
- * <li>Upon restart, the FlowFile Repository was not synchronized with its
- * underlying storage mechanism and as such indicates that FlowFile F needs
- * ContentClaim C.</li>
- * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
- * and FlowFile F's Content is not found, so the FlowFile is removed,
- * resulting in data loss.</li>
- * </ul>
- *
- * <p>
- * Using this method of marking the ContentClaim as destructable only when
- * the FlowFile repository has been synced with the underlying storage
- * mechanism, we can ensure that on restart, we will not point to this
- * unneeded claim. As such, it is now safe to destroy the contents.
- * </p>
- *
- * @param claim to mark as now destructable
- */
- void markDestructable(ContentClaim claim);
-
- /**
- * Drains up to {@code maxElements} Content Claims from the internal queue
- * of destructable content claims to the given {@code destination} so that
- * they can be destroyed.
- *
- * @param destination to drain to
- * @param maxElements max items to drain
- */
- void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements);
-
- /**
- * Drains up to {@code maxElements} Content Claims from the internal queue
- * of destructable content claims to the given {@code destination} so that
- * they can be destroyed. If no ContentClaim is ready to be destroyed at
- * this time, will wait up to the specified amount of time before returning.
- * If, after the specified amount of time, there is still no ContentClaim
- * ready to be destroyed, the method will return without having added
- * anything to the given {@code destination}.
- *
- * @param destination to drain to
- * @param maxElements max items to drain
- * @param timeout maximum time to wait
- * @param unit unit of time to wait
- */
- void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit);
-
- /**
- * Clears the manager's memory of any and all ContentClaims that it knows
- * about
- */
- void purge();
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
new file mode 100644
index 0000000..d448632
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+/**
+ * <p>
+ * Represents a resource that can be provided by a {@link ContentRepository}
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE!
+ * </p>
+ */
+public interface ResourceClaim extends Comparable<ResourceClaim> {
+
+ /**
+ * @return the unique identifier for this claim
+ */
+ String getId();
+
+ /**
+ * @return the container identifier in which this claim is held
+ */
+ String getContainer();
+
+ /**
+ * @return the section within a given container the claim is held
+ */
+ String getSection();
+
+ /**
+ * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
+ * attempt to keep the content but will not sacrifice a great deal of
+ * performance to do so
+ */
+ boolean isLossTolerant();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
new file mode 100644
index 0000000..01f4c65
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ResourceClaims that are used in the application
+ */
+public interface ResourceClaimManager {
+
+ /**
+ * Creates a new Resource Claim with the given id, container, section, and
+ * loss tolerance.
+ *
+ * @param id of claim
+ * @param container of claim
+ * @param section of claim
+ * @param lossTolerant of claim
+ * @return new claim
+ */
+ ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+
+ /**
+ * @param claim to obtain reference count for
+ * @return the number of FlowFiles that hold a claim to a particular piece
+ * of FlowFile content
+ */
+ int getClaimantCount(ResourceClaim claim);
+
+ /**
+ * Decreases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count
+ *
+ * @param claim to decrement claimants on
+ * @return new claimaint count
+ */
+ int decrementClaimantCount(ResourceClaim claim);
+
+ /**
+ * Increases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count
+ *
+ * @param claim to increment claims on
+ * @return new claimant count
+ */
+ int incrementClaimantCount(ResourceClaim claim);
+
+ /**
+ * Increases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count.
+ *
+ * If it is known that the Content Claim whose count is being incremented is
+ * a newly created ResourceClaim, this method should be called with a value
+ * of {@code true} as the second argument, as it may allow the manager to
+ * optimize its tasks, knowing that the Content Claim cannot be referenced
+ * by any other component
+ *
+ * @param claim to increment
+ * @param newClaim provides a hint that no other process can have access to this
+ * claim right now
+ * @return new claim count
+ */
+ int incrementClaimantCount(ResourceClaim claim, boolean newClaim);
+
+ /**
+ * Indicates that the given ResourceClaim can now be destroyed by the
+ * appropriate Content Repository. This should be done only after it is
+ * guaranteed that the FlowFile Repository has been synchronized with its
+ * underlying storage component. This way, we avoid the following sequence
+ * of events:
+ * <ul>
+ * <li>FlowFile Repository is updated to indicate that FlowFile F no longer depends on ResourceClaim C</li>
+ * <li>ResourceClaim C is no longer needed and is destroyed</li>
+ * <li>The Operating System crashes or there is a power failure</li>
+ * <li>Upon restart, the FlowFile Repository was not synchronized with its underlying storage mechanism and as such indicates that FlowFile F needs ResourceClaim C.</li>
+ * <li>Since ResourceClaim C has already been destroyed, it is inaccessible, and FlowFile F's Content is not found, so the FlowFile is removed, resulting in data loss.</li>
+ * </ul>
+ *
+ * <p>
+ * Using this method of marking the ResourceClaim as destructable only when the FlowFile repository has been synced with the underlying storage mechanism, we can ensure that on restart, we will
+ * not point to this unneeded claim. As such, it is now safe to destroy the contents.
+ * </p>
+ *
+ * @param claim to mark as now destructable
+ */
+ void markDestructable(ResourceClaim claim);
+
+ /**
+ * Drains up to {@code maxElements} Content Claims from the internal queue
+ * of destructable content claims to the given {@code destination} so that
+ * they can be destroyed.
+ *
+ * @param destination to drain to
+ * @param maxElements max items to drain
+ */
+ void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements);
+
+ /**
+ * Drains up to {@code maxElements} Content Claims from the internal queue
+ * of destructable content claims to the given {@code destination} so that
+ * they can be destroyed. If no ResourceClaim is ready to be destroyed at
+ * this time, will wait up to the specified amount of time before returning.
+ * If, after the specified amount of time, there is still no ResourceClaim
+ * ready to be destroyed, the method will return without having added
+ * anything to the given {@code destination}.
+ *
+ * @param destination to drain to
+ * @param maxElements max items to drain
+ * @param timeout maximum time to wait
+ * @param unit unit of time to wait
+ */
+ void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);
+
+ /**
+ * Clears the manager's memory of any and all ResourceClaims that it knows
+ * about
+ */
+ void purge();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 9bbd45e..47f236d 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream {
public void close() throws IOException {
out.close();
}
+
+ public OutputStream getWrappedStream() {
+ return out;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 8d64143..f48988a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -135,6 +135,7 @@
<exclude>src/test/resources/conf/0bytes.xml</exclude>
<exclude>src/test/resources/conf/termination-only.xml</exclude>
<exclude>src/test/resources/hello.txt</exclude>
+ <exclude>src/test/resources/bye.txt</exclude>
<exclude>src/test/resources/old-swap-file.swap</exclude>
</excludes>
</configuration>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 604dba9..c829566 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,7 +61,9 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -83,7 +85,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
- public static final int SWAP_ENCODING_VERSION = 6;
+ public static final int SWAP_ENCODING_VERSION = 7;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private final ScheduledExecutorService swapQueueIdentifierExecutor;
@@ -98,7 +100,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private final long swapOutMillis;
private final int swapOutThreadCount;
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@@ -138,7 +140,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
@Override
- public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
+ public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) {
this.claimManager = claimManager;
this.flowFileRepository = flowFileRepository;
this.eventReporter = eventReporter;
@@ -184,11 +186,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- out.writeUTF(claim.getId());
- out.writeUTF(claim.getContainer());
- out.writeUTF(claim.getSection());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ out.writeUTF(resourceClaim.getId());
+ out.writeUTF(resourceClaim.getContainer());
+ out.writeUTF(resourceClaim.getSection());
+ out.writeLong(claim.getOffset());
+ out.writeLong(claim.getLength());
out.writeLong(flowFile.getContentClaimOffset());
- out.writeBoolean(claim.isLossTolerant());
+ out.writeBoolean(resourceClaim.isLossTolerant());
}
final Map<String, String> attributes = flowFile.getAttributes();
@@ -226,7 +231,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
- static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
+ static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
@@ -245,7 +250,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue,
- final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
+ final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < numFlowFiles; i++) {
// legacy encoding had an "action" because it used to be couple with FlowFile Repository code
@@ -292,6 +297,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final String container = in.readUTF();
final String section = in.readUTF();
+
+ final long resourceOffset;
+ final long resourceLength;
+ if (serializationVersion < 6) {
+ resourceOffset = 0L;
+ resourceLength = -1L;
+ } else {
+ resourceOffset = in.readLong();
+ resourceLength = in.readLong();
+ }
+
final long claimOffset = in.readLong();
final boolean lossTolerant;
@@ -301,10 +317,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lossTolerant = false;
}
- final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+ final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+ claim.setLength(resourceLength);
if (incrementContentClaims) {
- claimManager.incrementClaimantCount(claim);
+ claimManager.incrementClaimantCount(resourceClaim);
}
ffBuilder.contentClaim(claim);
@@ -353,16 +371,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
throw new EOFException();
}
if (firstValue == 0xff && secondValue == 0xff) {
- int ch1 = in.read();
- int ch2 = in.read();
- int ch3 = in.read();
- int ch4 = in.read();
+ final int ch1 = in.read();
+ final int ch2 = in.read();
+ final int ch3 = in.read();
+ final int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) {
throw new EOFException();
}
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
} else {
- return ((firstValue << 8) + (secondValue));
+ return (firstValue << 8) + secondValue;
}
}
@@ -422,7 +440,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final FlowFileQueue flowFileQueue = entry.getKey();
// if queue is more than 60% of its swap threshold, don't swap flowfiles in
- if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
+ if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) {
continue;
}
@@ -432,7 +450,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final Queue<File> queue = queueLockWrapper.getQueue();
// Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
- while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
+ while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) {
File swapFile = null;
try {
swapFile = queue.poll();
@@ -545,7 +563,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
if (swapQueue == null) {
swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
- QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
+ final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
if (oldQueue != null) {
swapQueue = oldQueue;
}
@@ -567,7 +585,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
* @return the largest FlowFile ID that was recovered
*/
@Override
- public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
+ public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
@@ -680,6 +698,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
+ @Override
public void shutdown() {
swapQueueIdentifierExecutor.shutdownNow();
swapInExecutor.shutdownNow();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d78b3a..af99d50 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -98,9 +98,12 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
@@ -176,7 +179,6 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@@ -282,7 +284,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final NodeProtocolSender protocolSender;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
- private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
+ private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
// guarded by rwLock
/**
@@ -495,7 +497,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
}
- private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+ private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
@@ -3108,7 +3110,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public boolean isInputAvailable() {
try {
- return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
+ return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+ event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
} catch (final IOException e) {
return false;
}
@@ -3117,43 +3120,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public boolean isOutputAvailable() {
try {
- return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
+ return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(),
+ event.getContentClaimIdentifier(), event.getContentClaimOffset()));
} catch (final IOException e) {
return false;
}
}
- private ContentClaim createClaim(final String container, final String section, final String identifier) {
+ private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) {
if (container == null || section == null || identifier == null) {
return null;
}
- return new ContentClaim() {
- @Override
- public int compareTo(final ContentClaim o) {
- return 0;
- }
-
- @Override
- public String getId() {
- return identifier;
- }
-
- @Override
- public String getContainer() {
- return container;
- }
-
- @Override
- public String getSection() {
- return section;
- }
-
- @Override
- public boolean isLossTolerant() {
- return false;
- }
- };
+ final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false);
+ return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
}
@Override
@@ -3170,45 +3150,48 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
requireNonNull(requestUri);
final ContentClaim claim;
- final Long offset;
final long size;
+ final long offset;
if (direction == ContentDirection.INPUT) {
if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Input Content Claim not specified");
}
- claim = contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false);
- offset = provEvent.getPreviousContentClaimOffset();
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+ provEvent.getPreviousContentClaimIdentifier(), false);
+ claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
+ offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
size = provEvent.getPreviousFileSize();
} else {
if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Output Content Claim not specified");
}
- claim = contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false);
- offset = provEvent.getContentClaimOffset();
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+ provEvent.getContentClaimIdentifier(), false);
+
+ claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
+ offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
size = provEvent.getFileSize();
}
final InputStream rawStream = contentRepository.read(claim);
- if (offset != null) {
- StreamUtils.skip(rawStream, offset.longValue());
- }
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
- .setEventType(ProvenanceEventType.SEND)
- .setFlowFileUUID(provEvent.getFlowFileUuid())
- .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap())
- .setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), offset, size)
- .setTransitUri(requestUri)
- .setEventTime(System.currentTimeMillis())
- .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
- .setLineageStartDate(provEvent.getLineageStartDate())
- .setComponentType(getName())
- .setComponentId(getRootGroupId())
- .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
- .build();
+ .setEventType(ProvenanceEventType.SEND)
+ .setFlowFileUUID(provEvent.getFlowFileUuid())
+ .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
+ .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
+ .setTransitUri(requestUri)
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
+ .setLineageStartDate(provEvent.getLineageStartDate())
+ .setComponentType(getName())
+ .setComponentId(getRootGroupId())
+ .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
+ .build();
provenanceEventRepository.registerEvent(sendEvent);
@@ -3233,7 +3216,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- if (!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer, contentClaimSection, contentClaimId, false))) {
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
+
+ if (!contentRepository.isAccessible(contentClaim)) {
return "Content is no longer available in Content Repository";
}
} catch (final IOException ioe) {
@@ -3310,18 +3296,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Create the ContentClaim
- final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim
- contentClaimManager.incrementClaimantCount(claim);
+ contentClaimManager.incrementClaimantCount(resourceClaim);
+ final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
+ final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
+ contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
- if (!contentRepository.isAccessible(claim)) {
- contentClaimManager.decrementClaimantCount(claim);
+ if (!contentRepository.isAccessible(contentClaim)) {
+ contentClaimManager.decrementClaimantCount(resourceClaim);
throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
}
- final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
final String parentUUID = event.getFlowFileUuid();
// Create the FlowFile Record
@@ -3331,39 +3319,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String newFlowFileUUID = UUID.randomUUID().toString();
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- // Copy relevant info from source FlowFile
- .addAttributes(event.getPreviousAttributes())
- .contentClaim(claim)
- .contentClaimOffset(claimOffset)
- .entryDate(System.currentTimeMillis())
- .id(flowFileRepository.getNextFlowFileSequence())
- .lineageIdentifiers(lineageIdentifiers)
- .lineageStartDate(event.getLineageStartDate())
- .size(contentSize.longValue())
- // Create a new UUID and add attributes indicating that this is a replay
- .addAttribute("flowfile.replay", "true")
- .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
- .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
- // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
- .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
- // build the record
- .build();
+ // Copy relevant info from source FlowFile
+ .addAttributes(event.getPreviousAttributes())
+ .contentClaim(contentClaim)
+ .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
+ .entryDate(System.currentTimeMillis())
+ .id(flowFileRepository.getNextFlowFileSequence())
+ .lineageIdentifiers(lineageIdentifiers)
+ .lineageStartDate(event.getLineageStartDate())
+ .size(contentSize.longValue())
+ // Create a new UUID and add attributes indicating that this is a replay
+ .addAttribute("flowfile.replay", "true")
+ .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
+ .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
+ // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
+ .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
+ // build the record
+ .build();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder()
- .setEventType(ProvenanceEventType.REPLAY)
- .addChildUuid(newFlowFileUUID)
- .addParentUuid(parentUUID)
- .setFlowFileUUID(parentUUID)
- .setAttributes(Collections.<String, String>emptyMap(), flowFileRecord.getAttributes())
- .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
- .setDetails("Replay requested by " + requestor)
- .setEventTime(System.currentTimeMillis())
- .setFlowFileEntryDate(System.currentTimeMillis())
- .setLineageStartDate(event.getLineageStartDate())
- .setComponentType(event.getComponentType())
- .setComponentId(event.getComponentId())
- .build();
+ .setEventType(ProvenanceEventType.REPLAY)
+ .addChildUuid(newFlowFileUUID)
+ .addParentUuid(parentUUID)
+ .setFlowFileUUID(parentUUID)
+ .setAttributes(Collections.<String, String> emptyMap(), flowFileRecord.getAttributes())
+ .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
+ .setDetails("Replay requested by " + requestor)
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(System.currentTimeMillis())
+ .setLineageStartDate(event.getLineageStartDate())
+ .setComponentType(event.getComponentType())
+ .setComponentId(event.getComponentId())
+ .build();
provenanceEventRepository.registerEvent(replayEvent);
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow