You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/31 14:03:16 UTC
nifi git commit: NIFI-2920 This is a 0.x version of Mark Payne's
patch for NIFI-2925
Repository: nifi
Updated Branches:
refs/heads/0.x 05d5fbad6 -> a9395bc67
NIFI-2920 This is a 0.x version of Mark Payne's patch for NIFI-2925
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9395bc6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9395bc6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9395bc6
Branch: refs/heads/0.x
Commit: a9395bc67c2de4fa6ba82ca0573ad210ccc67434
Parents: 05d5fba
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Oct 27 14:31:53 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Oct 31 10:02:51 2016 -0400
----------------------------------------------------------------------
.../repository/claim/ResourceClaimManager.java | 14 ++++-
.../nifi/controller/FileSystemSwapManager.java | 9 +++-
.../apache/nifi/controller/FlowController.java | 11 ++--
.../repository/FileSystemRepository.java | 5 +-
.../repository/VolatileContentRepository.java | 2 +-
.../WriteAheadFlowFileRepository.java | 2 +-
.../claim/StandardResourceClaimManager.java | 54 +++++++++++++++-----
.../controller/TestFileSystemSwapManager.java | 7 ++-
.../repository/TestStandardProcessSession.java | 20 ++++----
.../TestVolatileContentRepository.java | 2 +-
.../TestWriteAheadFlowFileRepository.java | 4 +-
.../claim/TestStandardResourceClaimManager.java | 2 +-
12 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index b430df0..68643f9 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -32,9 +32,21 @@ public interface ResourceClaimManager {
* @param container of claim
* @param section of claim
* @param lossTolerant of claim
+ * @param writable whether or not the claim should be made writable
* @return new claim
*/
- ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+ ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable);
+
+ /**
+ * Returns the Resource Claim with the given id, container, and section, if one exists, <code>null</code> otherwise
+ *
+ * @param id of claim
+ * @param container of claim
+ * @param section of claim
+ * @return the existing resource claim or <code>null</code> if none exists
+ */
+ ResourceClaim getResourceClaim(String container, String section, String id);
+
/**
* @param claim to obtain reference count for
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 156389b..15a90cb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -492,7 +492,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lossTolerant = false;
}
- resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+ resourceClaim = claimManager.getResourceClaim(container, section, claimId);
+ if (resourceClaim == null) {
+ logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
+ + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
+ + "ability to properly clean up this resource", container, section, claimId);
+ resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
+ }
+
final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
claim.setLength(resourceLength);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3eff44c..0895006 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3349,7 +3349,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null;
}
- final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false);
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false);
return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
}
@@ -3375,7 +3375,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
- provEvent.getPreviousContentClaimIdentifier(), false);
+ provEvent.getPreviousContentClaimIdentifier(), false, false);
+
claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
size = provEvent.getPreviousFileSize();
@@ -3385,7 +3386,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
- provEvent.getContentClaimIdentifier(), false);
+ provEvent.getContentClaimIdentifier(), false, false);
claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
@@ -3478,7 +3479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
if (!contentRepository.isAccessible(contentClaim)) {
@@ -3559,7 +3560,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Create the ContentClaim
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
- event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+ event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
// Increment Claimant Count, since we will now be referencing the Content Claim
resourceClaimManager.incrementClaimantCount(resourceClaim);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 8a3ac6d..d94869f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -426,7 +426,7 @@ public class FileSystemRepository implements ContentRepository {
final String id = idPath.toFile().getName();
final String sectionName = sectionPath.toFile().getName();
- final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false);
if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove);
}
@@ -524,7 +524,7 @@ public class FileSystemRepository implements ContentRepository {
final String section = String.valueOf(modulatedSectionIndex);
final String claimId = System.currentTimeMillis() + "-" + currentIndex;
- resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+ resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true);
resourceOffset = 0L;
LOG.debug("Creating new Resource Claim {}", resourceClaim);
@@ -939,6 +939,7 @@ public class FileSystemRepository implements ContentRepository {
LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this);
} else {
writableClaimStreams.remove(scc.getResourceClaim());
+ resourceClaimManager.freeze(scc.getResourceClaim());
bcos.close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 08b7e80..1f75320 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -203,7 +203,7 @@ public class VolatileContentRepository implements ContentRepository {
private ContentClaim createLossTolerant() {
final long id = idGenerator.getAndIncrement();
- final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false);
final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
claimManager.incrementClaimantCount(resourceClaim, true);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 27d6c9b..ca52544 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -779,7 +779,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lossTolerant = false;
}
- final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
contentClaim.setLength(resourceLength);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 9cb0fa1..be0e8b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory;
public class StandardResourceClaimManager implements ResourceClaimManager {
- private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
@Override
- public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
- return new StandardResourceClaim(this, container, section, id, lossTolerant);
+ public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) {
+ final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant);
+ if (!writable) {
+ claim.freeze();
+ }
+ return claim;
+ }
+
+ @Override
+ public ResourceClaim getResourceClaim(final String container, final String section, final String id) {
+ final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false);
+ final ClaimCount count = claimantCounts.get(tempClaim);
+ return (count == null) ? null : count.getClaim();
}
private static AtomicInteger getCounter(final ResourceClaim claim) {
@@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
return null;
}
- AtomicInteger counter = claimantCounts.get(claim);
+ ClaimCount counter = claimantCounts.get(claim);
if (counter != null) {
- return counter;
+ return counter.getCount();
}
- counter = new AtomicInteger(0);
- final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
- return existingCounter == null ? counter : existingCounter;
+ counter = new ClaimCount(claim, new AtomicInteger(0));
+ final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter);
+ return existingCounter == null ? counter.getCount() : existingCounter.getCount();
}
@Override
@@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
}
synchronized (claim) {
- final AtomicInteger counter = claimantCounts.get(claim);
- return counter == null ? 0 : counter.get();
+ final ClaimCount counter = claimantCounts.get(claim);
+ return counter == null ? 0 : counter.getCount().get();
}
}
@@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
}
synchronized (claim) {
- final AtomicInteger counter = claimantCounts.get(claim);
+ final ClaimCount counter = claimantCounts.get(claim);
if (counter == null) {
logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
return -1;
}
- final int newClaimantCount = counter.decrementAndGet();
+ final int newClaimantCount = counter.getCount().decrementAndGet();
if (newClaimantCount < 0) {
logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount);
} else {
@@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
((StandardResourceClaim) claim).freeze();
}
+
+
+ private static final class ClaimCount {
+ private final ResourceClaim claim;
+ private final AtomicInteger count;
+
+ public ClaimCount(final ResourceClaim claim, final AtomicInteger count) {
+ this.claim = claim;
+ this.count = count;
+ }
+
+ public AtomicInteger getCount() {
+ return count;
+ }
+
+ public ResourceClaim getClaim() {
+ return claim;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 2ab8e35..24b61b9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -120,7 +120,12 @@ public class TestFileSystemSwapManager {
public class NopResourceClaimManager implements ResourceClaimManager {
@Override
- public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) {
+ public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
+ return null;
+ }
+
+ @Override
+ public ResourceClaim getResourceClaim(String container, String section, String id) {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 2a401c6..71bfd27 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -829,7 +829,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.size(1L)
.build();
flowFileQueue.put(flowFileRecord);
@@ -977,7 +977,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.size(1L)
.build();
flowFileQueue.put(flowFileRecord);
@@ -1001,7 +1001,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build();
flowFileQueue.put(flowFileRecord);
@@ -1017,7 +1017,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L)
.size(1000L)
.build();
@@ -1042,7 +1042,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build();
flowFileQueue.put(flowFileRecord);
@@ -1059,7 +1059,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
@@ -1128,7 +1128,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
@@ -1166,7 +1166,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
- .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
+ .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
@@ -1395,7 +1395,7 @@ public class TestStandardProcessSession {
final Set<ContentClaim> claims = new HashSet<>();
for (long i = 0; i < idGenerator.get(); i++) {
- final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false);
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
if (getClaimantCount(contentClaim) > 0) {
claims.add(contentClaim);
@@ -1407,7 +1407,7 @@ public class TestStandardProcessSession {
@Override
public ContentClaim create(boolean lossTolerant) throws IOException {
- final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
claimantCounts.put(contentClaim, new AtomicInteger(1));
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index 5733164..0b24c03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -83,7 +83,7 @@ public class TestVolatileContentRepository {
final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
contentRepo.setBackupRepository(mockRepo);
- final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index ee26d1f..ca79fab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -86,10 +86,10 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
- final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false);
+ final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
- final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false);
+ final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index d29105a..867810e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager {
}
};
- final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false);
+ final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false, false);
assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1.
assertEquals(1, manager.getClaimantCount(resourceClaim));