You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 23:02:19 UTC
[2/4] nifi git commit: NIFI-744: Refactored ContentClaim into
ContentClaim and ResourceClaim so that we can append to a single file in the
FileSystemRepository even after a session is completed
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..3a6338c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -23,14 +23,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
@@ -61,18 +58,20 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +91,11 @@ public class FileSystemRepository implements ContentRepository {
private final AtomicLong index;
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
- private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>();
+ private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+ private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
+ private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final boolean archiveData;
private final long maxArchiveMillis;
@@ -101,7 +103,7 @@ public class FileSystemRepository implements ContentRepository {
private final boolean alwaysSync;
private final ScheduledExecutorService containerCleanupExecutor;
- private ContentClaimManager contentClaimManager; // effectively final
+ private ResourceClaimManager resourceClaimManager; // effectively final
// Map of contianer to archived files that should be deleted next.
private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
@@ -113,7 +115,7 @@ public class FileSystemRepository implements ContentRepository {
final NiFiProperties properties = NiFiProperties.getInstance();
// determine the file repository paths and ensure they exist
final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
- for (Path path : fileRespositoryPaths.values()) {
+ for (final Path path : fileRespositoryPaths.values()) {
Files.createDirectories(path);
}
@@ -122,7 +124,7 @@ public class FileSystemRepository implements ContentRepository {
index = new AtomicLong(0L);
for (final String containerName : containerNames) {
- reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000));
+ reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000));
archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000));
}
@@ -196,8 +198,8 @@ public class FileSystemRepository implements ContentRepository {
}
@Override
- public void initialize(final ContentClaimManager claimManager) {
- this.contentClaimManager = claimManager;
+ public void initialize(final ResourceClaimManager claimManager) {
+ this.resourceClaimManager = claimManager;
final NiFiProperties properties = NiFiProperties.getInstance();
@@ -231,6 +233,13 @@ public class FileSystemRepository implements ContentRepository {
public void shutdown() {
executor.shutdown();
containerCleanupExecutor.shutdown();
+
+ for (final OutputStream out : writableClaimStreams.values()) {
+ try {
+ out.close();
+ } catch (final IOException ioe) {
+ }
+ }
}
private static double getRatio(final String value) {
@@ -397,8 +406,8 @@ public class FileSystemRepository implements ContentRepository {
final String id = idPath.toFile().getName();
final String sectionName = sectionPath.toFile().getName();
- final ContentClaim contentClaim = contentClaimManager.newContentClaim(containerName, sectionName, id, false);
- if (contentClaimManager.getClaimantCount(contentClaim) == 0) {
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+ if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove);
}
}
@@ -427,15 +436,21 @@ public class FileSystemRepository implements ContentRepository {
}
private Path getPath(final ContentClaim claim) {
- final Path containerPath = containers.get(claim.getContainer());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ return getPath(resourceClaim);
+ }
+
+ private Path getPath(final ResourceClaim resourceClaim) {
+ final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
return null;
}
- return containerPath.resolve(claim.getSection()).resolve(claim.getId());
+ return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
}
private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
- final Path containerPath = containers.get(claim.getContainer());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
if (verifyExists) {
throw new ContentNotFoundException(claim);
@@ -445,11 +460,11 @@ public class FileSystemRepository implements ContentRepository {
}
// Create the Path that points to the data
- Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
+ Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
// If the data does not exist, create a Path that points to where the data would exist in the archive directory.
if (!Files.exists(resolvedPath)) {
- resolvedPath = getArchivePath(claim);
+ resolvedPath = getArchivePath(claim.getResourceClaim());
}
if (verifyExists && !Files.exists(resolvedPath)) {
@@ -460,34 +475,55 @@ public class FileSystemRepository implements ContentRepository {
@Override
public ContentClaim create(final boolean lossTolerant) throws IOException {
- final long currentIndex = index.incrementAndGet();
-
- String containerName = null;
- boolean waitRequired = true;
- ContainerState containerState = null;
- for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
- final long modulatedContainerIndex = containerIndex % containers.size();
- containerName = containerNames.get((int) modulatedContainerIndex);
-
- containerState = containerStateMap.get(containerName);
- if (!containerState.isWaitRequired()) {
- waitRequired = false;
- break;
- }
- }
+ ResourceClaim resourceClaim;
+
+ // We need to synchronize on this queue because the act of pulling something off
+ // the queue and incrementing the associated claimant count MUST be done atomically.
+ // This way, if the claimant count is decremented to 0, we can ensure that the
+ // claim is not then pulled from the queue and used as another thread is destroying/archiving
+ // the claim.
+ final long resourceOffset;
+ synchronized (writableClaimQueue) {
+ final ClaimLengthPair pair = writableClaimQueue.poll();
+ if (pair == null) {
+ final long currentIndex = index.incrementAndGet();
+
+ String containerName = null;
+ boolean waitRequired = true;
+ ContainerState containerState = null;
+ for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
+ final long modulatedContainerIndex = containerIndex % containers.size();
+ containerName = containerNames.get((int) modulatedContainerIndex);
+
+ containerState = containerStateMap.get(containerName);
+ if (!containerState.isWaitRequired()) {
+ waitRequired = false;
+ break;
+ }
+ }
- if (waitRequired) {
- containerState.waitForArchiveExpiration();
- }
+ if (waitRequired) {
+ containerState.waitForArchiveExpiration();
+ }
- final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
- final String section = String.valueOf(modulatedSectionIndex);
- final String claimId = System.currentTimeMillis() + "-" + currentIndex;
+ final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
+ final String section = String.valueOf(modulatedSectionIndex);
+ final String claimId = System.currentTimeMillis() + "-" + currentIndex;
- final ContentClaim claim = contentClaimManager.newContentClaim(containerName, section, claimId, lossTolerant);
- contentClaimManager.incrementClaimantCount(claim, true);
+ resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+ resourceOffset = 0L;
+ LOG.debug("Creating new Resource Claim {}", resourceClaim);
+ } else {
+ resourceClaim = pair.getClaim();
+ resourceOffset = pair.getLength();
+ LOG.debug("Reusing Resource Claim {}", resourceClaim);
+ }
+
+ resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+ }
- return claim;
+ final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
+ return scc;
}
@Override
@@ -496,7 +532,7 @@ public class FileSystemRepository implements ContentRepository {
return 0;
}
- return contentClaimManager.incrementClaimantCount(claim);
+ return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
}
@Override
@@ -504,7 +540,7 @@ public class FileSystemRepository implements ContentRepository {
if (claim == null) {
return 0;
}
- return contentClaimManager.getClaimantCount(claim);
+ return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
}
@Override
@@ -513,7 +549,7 @@ public class FileSystemRepository implements ContentRepository {
return 0;
}
- final int claimantCount = contentClaimManager.decrementClaimantCount(claim);
+ final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
return claimantCount;
}
@@ -523,9 +559,30 @@ public class FileSystemRepository implements ContentRepository {
return false;
}
+ return remove(claim.getResourceClaim());
+ }
+
+ private boolean remove(final ResourceClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ // we synchronize on the queue here because if the claimant count is 0,
+ // we need to be able to remove any instance of that resource claim from the
+ // queue atomically (i.e., the checking of the claimant count plus removal from the queue
+ // must be atomic)
+ synchronized (writableClaimQueue) {
+ final int claimantCount = resourceClaimManager.getClaimantCount(claim);
+ if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ // if other content claims are claiming the same resource, we have nothing to destroy,
+ // so just consider the destruction successful.
+ return true;
+ }
+ }
+
Path path = null;
try {
- path = getPath(claim, false);
+ path = getPath(claim);
} catch (final ContentNotFoundException cnfe) {
}
@@ -538,6 +595,7 @@ public class FileSystemRepository implements ContentRepository {
return true;
}
+
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
if (original == null) {
@@ -545,14 +603,11 @@ public class FileSystemRepository implements ContentRepository {
}
final ContentClaim newClaim = create(lossTolerant);
- final Path currPath = getPath(original, true);
- final Path newPath = getPath(newClaim);
- try (final FileOutputStream fos = new FileOutputStream(newPath.toFile())) {
- Files.copy(currPath, fos);
- if (alwaysSync) {
- fos.getFD().sync();
- }
+ try (final InputStream in = read(original);
+ final OutputStream out = write(newClaim)) {
+ StreamUtils.copy(in, out);
} catch (final IOException ioe) {
+ decrementClaimantCount(newClaim);
remove(newClaim);
throw ioe;
}
@@ -564,44 +619,28 @@ public class FileSystemRepository implements ContentRepository {
if (claims.contains(destination)) {
throw new IllegalArgumentException("destination cannot be within claims");
}
- try (final FileChannel dest = FileChannel.open(getPath(destination), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
- long position = 0L;
- if (header != null && header.length > 0) {
- final ByteBuffer buffer = ByteBuffer.wrap(header);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
- }
+
+ try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
+ if (header != null) {
+ out.write(header);
}
- int objectIndex = 0;
+
+ int i = 0;
for (final ContentClaim claim : claims) {
- long totalCopied = 0L;
- try (final FileChannel src = FileChannel.open(getPath(claim, true), StandardOpenOption.READ)) {
- while (totalCopied < src.size()) {
- final long copiedThisIteration = dest.transferFrom(src, position, Long.MAX_VALUE);
- totalCopied += copiedThisIteration;
- position += copiedThisIteration;
- }
+ try (final InputStream in = read(claim)) {
+ StreamUtils.copy(in, out);
}
- // don't add demarcator after the last claim
- if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) {
- final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
- }
- }
- }
- if (footer != null && footer.length > 0) {
- final ByteBuffer buffer = ByteBuffer.wrap(footer);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
+
+ if (++i < claims.size() && demarcator != null) {
+ out.write(demarcator);
}
}
- if (alwaysSync) {
- dest.force(true);
+ if (footer != null) {
+ out.write(footer);
}
- return position;
+ return out.getBytesWritten();
}
}
@@ -624,12 +663,8 @@ public class FileSystemRepository implements ContentRepository {
@Override
public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException {
- try (final FileOutputStream out = new FileOutputStream(getPath(claim).toFile(), append)) {
- final long copied = StreamUtils.copy(content, out);
- if (alwaysSync) {
- out.getFD().sync();
- }
- return copied;
+ try (final OutputStream out = write(claim, append)) {
+ return StreamUtils.copy(content, out);
}
}
@@ -642,20 +677,14 @@ public class FileSystemRepository implements ContentRepository {
Files.createFile(destination);
return 0L;
}
- if (append) {
- try (final FileChannel sourceChannel = FileChannel.open(getPath(claim, true), StandardOpenOption.READ);
- final FileChannel destinationChannel = FileChannel.open(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
- long position = destinationChannel.size();
- final long targetSize = position + sourceChannel.size();
- while (position < targetSize) {
- final long bytesCopied = destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE);
- position += bytesCopied;
- }
- return position;
+
+ try (final InputStream in = read(claim);
+ final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+ final long copied = StreamUtils.copy(in, fos);
+ if (alwaysSync) {
+ fos.getFD().sync();
}
- } else {
- Files.copy(getPath(claim, true), destination, StandardCopyOption.REPLACE_EXISTING);
- return Files.size(destination);
+ return copied;
}
}
@@ -674,28 +703,20 @@ public class FileSystemRepository implements ContentRepository {
final long claimSize = size(claim);
if (offset > claimSize) {
- throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize);
+ throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize);
}
- if (append) {
- try (final InputStream sourceStream = Files.newInputStream(getPath(claim, true), StandardOpenOption.READ);
- final OutputStream destinationStream = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
- StreamUtils.skip(sourceStream, offset);
-
- final byte[] buffer = new byte[8192];
- int len;
- long copied = 0L;
- while ((len = sourceStream.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) {
- destinationStream.write(buffer, 0, len);
- copied += len;
- }
- return copied;
+ try (final InputStream in = read(claim);
+ final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+ if (offset > 0) {
+ StreamUtils.skip(in, offset);
}
- } else {
- try (final OutputStream out = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
- return exportTo(claim, out, offset, length);
+ StreamUtils.copy(in, fos, length);
+ if (alwaysSync) {
+ fos.getFD().sync();
}
+ return length;
}
}
@@ -704,7 +725,10 @@ public class FileSystemRepository implements ContentRepository {
if (claim == null) {
return 0L;
}
- return Files.copy(getPath(claim, true), destination);
+
+ try (final InputStream in = read(claim)) {
+ return StreamUtils.copy(in, destination);
+ }
}
@Override
@@ -719,7 +743,7 @@ public class FileSystemRepository implements ContentRepository {
if (offset == 0 && length == claimSize) {
return exportTo(claim, destination);
}
- try (final InputStream in = Files.newInputStream(getPath(claim, true))) {
+ try (final InputStream in = read(claim)) {
StreamUtils.skip(in, offset);
final byte[] buffer = new byte[8192];
int len;
@@ -738,7 +762,12 @@ public class FileSystemRepository implements ContentRepository {
return 0L;
}
- return Files.size(getPath(claim, true));
+ // see javadocs for claim.getLength() as to why we do this.
+ if (claim.getLength() < 0) {
+ return Files.size(getPath(claim, true));
+ }
+
+ return claim.getLength();
}
@Override
@@ -747,16 +776,198 @@ public class FileSystemRepository implements ContentRepository {
return new ByteArrayInputStream(new byte[0]);
}
final Path path = getPath(claim, true);
- return new FileInputStream(path.toFile());
+ final FileInputStream fis = new FileInputStream(path.toFile());
+ if (claim.getOffset() > 0L) {
+ StreamUtils.skip(fis, claim.getOffset());
+ }
+
+ // see javadocs for claim.getLength() as to why we do this.
+ if (claim.getLength() >= 0) {
+ return new LimitedInputStream(fis, claim.getLength());
+ } else {
+ return fis;
+ }
}
@Override
- @SuppressWarnings("resource")
public OutputStream write(final ContentClaim claim) throws IOException {
- final FileOutputStream fos = new FileOutputStream(getPath(claim).toFile());
- return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos;
+ return write(claim, false);
}
+ private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
+ if (claim == null) {
+ throw new NullPointerException("ContentClaim cannot be null");
+ }
+
+ if (!(claim instanceof StandardContentClaim)) {
+ // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything
+ // else, just throw an Exception because it is not valid for this Repository
+ throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository");
+ }
+
+ final StandardContentClaim scc = (StandardContentClaim) claim;
+
+ // we always append because there may be another ContentClaim using the same resource claim.
+ // However, we know that we will never write to the same claim from two different threads
+ // at the same time because we will call create() to get the claim before we write to it,
+ // and when we call create(), it will remove it from the Queue, which means that no other
+ // thread will get the same Claim until we've finished writing to it.
+ ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
+ final long initialLength;
+ if (claimStream == null) {
+ final File file = getPath(scc).toFile();
+ claimStream = new ByteCountingOutputStream(new FileOutputStream(file, true), file.length());
+ initialLength = 0L;
+ } else {
+ if (append) {
+ initialLength = Math.max(0, scc.getLength());
+ } else {
+ initialLength = 0;
+ scc.setOffset(claimStream.getBytesWritten());
+ }
+ }
+
+ final ByteCountingOutputStream bcos = claimStream;
+ final OutputStream out = new OutputStream() {
+ private long bytesWritten = 0L;
+ private boolean recycle = true;
+ private boolean closed = false;
+
+ @Override
+ public String toString() {
+ return "FileSystemRepository Stream [" + scc + "]";
+ }
+
+ @Override
+ public synchronized void write(final int b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten++;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void write(final byte[] b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten += b.length;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b, off, len);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten += len;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ bcos.flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+
+ if (alwaysSync) {
+ ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
+ }
+
+ if (scc.getLength() < 0) {
+ // If claim was not written to, set length to 0
+ scc.setLength(0L);
+ }
+
+ // if we've not yet hit the threshold for appending to a resource claim, add the claim
+ // to the writableClaimQueue so that the Resource Claim can be used again when create()
+ // is called. In this case, we don't have to actually close the file stream. Instead, we
+ // can just add it onto the queue and continue to use it for the next content claim.
+ final long resourceClaimLength = scc.getOffset() + scc.getLength();
+ if (recycle && resourceClaimLength < maxAppendClaimLength) {
+ // we do not have to synchronize on the writable claim queue here because we
+ // are only adding something to the queue. We must synchronize if we are
+ // using a ResourceClaim from the queue and incrementing the claimant count on that resource
+ // because those need to be done atomically, or if we are destroying a claim that is on
+ // the queue because we need to ensure that the latter operation does not cause problems
+ // with the former.
+ final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
+ final boolean enqueued;
+ if (writableClaimQueue.contains(pair)) {
+ // may already exist on the queue, if the content claim is written to multiple times.
+ enqueued = true;
+ } else {
+ enqueued = writableClaimQueue.offer(pair);
+ }
+
+ if (enqueued) {
+ writableClaimStreams.put(scc.getResourceClaim(), bcos);
+ LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+ } else {
+ writableClaimStreams.remove(scc.getResourceClaim());
+ bcos.close();
+
+ LOG.debug("Claim length less than max; Closing {}", this);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+ }
+ }
+ } else {
+ // we've reached the limit for this claim. Don't add it back to our queue.
+ // Instead, just remove it and move on.
+ writableClaimStreams.remove(scc.getResourceClaim());
+ // ensure that the claim is no longer on the queue
+ writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+ bcos.close();
+ LOG.debug("Claim lenth >= max; Closing {}", this);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+ }
+ }
+ }
+ };
+
+ LOG.debug("Writing to {}", out);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out));
+ }
+
+ return out;
+ }
+
+
@Override
public void purge() {
// delete all content from repositories
@@ -788,7 +999,7 @@ public class FileSystemRepository implements ContentRepository {
}
}
- contentClaimManager.purge();
+ resourceClaimManager.purge();
}
private class BinDestructableClaims implements Runnable {
@@ -800,17 +1011,17 @@ public class FileSystemRepository implements ContentRepository {
// because the Container generally maps to a physical partition on the disk, so we want a few
// different threads hitting the different partitions but don't want multiple threads hitting
// the same partition.
- final List<ContentClaim> toDestroy = new ArrayList<>();
+ final List<ResourceClaim> toDestroy = new ArrayList<>();
while (true) {
toDestroy.clear();
- contentClaimManager.drainDestructableClaims(toDestroy, 10000);
+ resourceClaimManager.drainDestructableClaims(toDestroy, 10000);
if (toDestroy.isEmpty()) {
return;
}
- for (final ContentClaim claim : toDestroy) {
+ for (final ResourceClaim claim : toDestroy) {
final String container = claim.getContainer();
- final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
+ final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container);
try {
while (true) {
@@ -838,7 +1049,7 @@ public class FileSystemRepository implements ContentRepository {
return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
}
- private Path getArchivePath(final ContentClaim claim) {
+ private Path getArchivePath(final ResourceClaim claim) {
final String claimId = claim.getId();
final Path containerPath = containers.get(claim.getContainer());
final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
@@ -859,22 +1070,28 @@ public class FileSystemRepository implements ContentRepository {
return true;
}
- return Files.exists(getArchivePath(contentClaim));
+ return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
- private void archive(final ContentClaim contentClaim) throws IOException {
+ private void archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return;
}
- final int claimantCount = getClaimantCount(contentClaim);
- if (claimantCount > 0) {
- throw new IllegalStateException("Cannot archive ContentClaim " + contentClaim + " because it is currently in use");
+ synchronized (writableClaimQueue) {
+ final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
+ if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ return;
+ }
+ }
+
+ final Path curPath = getPath(claim);
+ if (curPath == null) {
+ return;
}
- final Path curPath = getPath(contentClaim, true);
archive(curPath);
- LOG.debug("Successfully moved {} to archive", contentClaim);
+ LOG.debug("Successfully moved {} to archive", claim);
}
private void archive(final Path curPath) throws IOException {
@@ -1107,8 +1324,8 @@ public class FileSystemRepository implements ContentRepository {
while (true) {
// look through each of the binned queues of Content Claims
int successCount = 0;
- final List<ContentClaim> toRemove = new ArrayList<>();
- for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) {
+ final List<ResourceClaim> toRemove = new ArrayList<>();
+ for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) {
// drain the queue of all ContentClaims that can be destroyed for the given container.
final String container = entry.getKey();
final ContainerState containerState = containerStateMap.get(container);
@@ -1121,7 +1338,7 @@ public class FileSystemRepository implements ContentRepository {
// destroy each claim for this container
final long start = System.nanoTime();
- for (final ContentClaim claim : toRemove) {
+ for (final ResourceClaim claim : toRemove) {
if (archiveData) {
try {
archive(claim);
@@ -1210,7 +1427,7 @@ public class FileSystemRepository implements ContentRepository {
@Override
public void run() {
try {
- if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
+ if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
return;
@@ -1245,7 +1462,7 @@ public class FileSystemRepository implements ContentRepository {
if (oldestContainerArchive < 0L) {
boolean updated;
do {
- long oldest = oldestArchiveDate.get();
+ final long oldest = oldestArchiveDate.get();
if (oldestContainerArchive < oldest) {
updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
@@ -1298,7 +1515,7 @@ public class FileSystemRepository implements ContentRepository {
final long free = getContainerUsableSpace(containerName);
used = capacity - free;
bytesUsed = used;
- } catch (IOException e) {
+ } catch (final IOException e) {
return false;
}
}
@@ -1317,7 +1534,7 @@ public class FileSystemRepository implements ContentRepository {
try {
LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
condition.await();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
}
} finally {
@@ -1355,4 +1572,55 @@ public class FileSystemRepository implements ContentRepository {
}
}
+
+ private static class ClaimLengthPair {
+ private final ResourceClaim claim;
+ private final Long length;
+
+ public ClaimLengthPair(final ResourceClaim claim, final Long length) {
+ this.claim = claim;
+ this.length = length;
+ }
+
+ public ResourceClaim getClaim() {
+ return claim;
+ }
+
+ public Long getLength() {
+ return length;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (claim == null ? 0 : claim.hashCode());
+ return result;
+ }
+
+ /**
+ * Equality is determined purely by the ResourceClaim's equality
+ *
+ * @param obj the object to compare against
+ * @return -1, 0, or +1 according to the contract of Object.equals
+ */
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final ClaimLengthPair other = (ClaimLengthPair) obj;
+ return claim.equals(other.getClaim());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 6524cd3..cc8c734 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
@Override
public boolean isPenalized() {
- return (penaltyExpirationMs > 0) ? penaltyExpirationMs > System.currentTimeMillis() : false;
+ return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
}
@Override
@@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
public String toString() {
final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
- builder.append("claim", claim == null ? "" : claim.getId());
+ builder.append("claim", claim == null ? "" : claim.toString());
builder.append("offset", claimOffset);
builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
return builder.toString();
@@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final Set<String> bLineageIdentifiers = new HashSet<>();
private long bPenaltyExpirationMs = -1L;
private long bSize = 0L;
- private Map<String, String> bAttributes = new HashMap<>();
+ private final Map<String, String> bAttributes = new HashMap<>();
private ContentClaim bClaim = null;
private long bClaimOffset = 0L;
private long bLastQueueDate = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 04e819e..c245424 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
@@ -54,10 +55,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.LongHolder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.QueueSize;
@@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,10 +90,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
- private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
- NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
- private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
-
public static final String DEFAULT_FLOWFILE_PATH = "./";
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
@@ -124,12 +118,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final LongHolder bytesWritten = new LongHolder(0L);
private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L;
- private int writeRecursionLevel = 0;
-
- private ContentClaim currentWriteClaim = null;
- private OutputStream currentWriteClaimStream = null;
- private long currentWriteClaimSize = 0L;
- private int currentWriteClaimFlowFileCount = 0;
private ContentClaim currentReadClaim = null;
private ByteCountingInputStream currentReadClaimStream = null;
@@ -690,12 +678,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (originalClaim == null) {
builder.setCurrentContentClaim(null, null, null, null, 0L);
} else {
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
builder.setCurrentContentClaim(
- originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
- );
+ resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
builder.setPreviousContentClaim(
- originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
- );
+ resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
}
}
@@ -711,14 +700,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = flowFile.getSize();
- recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+ final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+ recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
}
if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
- recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+ recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
}
final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -741,14 +734,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = eventFlowFile.getSize();
- recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+ final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+ recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
}
if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
- recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+ recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
}
final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -1650,8 +1647,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim claim = record.getContentClaim();
if (claim != null) {
- enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
- enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ record.getContentClaimOffset() + claim.getOffset(), record.getSize());
+ enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ record.getContentClaimOffset() + claim.getOffset(), record.getSize());
}
enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
@@ -1695,7 +1695,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}
- return new NonCloseableInputStream(currentReadClaimStream);
+ return new DisableOnCloseInputStream(currentReadClaimStream);
}
}
@@ -1711,7 +1711,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
// reuse the same InputStream for the next FlowFile
- return new NonCloseableInputStream(currentReadClaimStream);
+ return new DisableOnCloseInputStream(currentReadClaimStream);
} else {
final InputStream rawInStream = context.getContentRepository().read(claim);
StreamUtils.skip(rawInStream, offset);
@@ -1862,30 +1862,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
- private void enforceCurrentWriteClaimState() {
- if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) {
- resetWriteClaims();
- }
-
- if (currentWriteClaimStream == null) {
- try {
- currentWriteClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} to enforce Current Write Claim State for {}", currentWriteClaim, context.getConnectable());
- } catch (final IOException e) {
- throw new FlowFileHandlingException("Unable to create ContentClaim due to " + e.toString(), e);
- }
-
- try {
- currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
- } catch (final IOException e) {
- resetWriteClaims();
- throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
- }
- } else {
- context.getContentRepository().incrementClaimaintCount(currentWriteClaim);
- }
- }
-
private void ensureNotAppending(final ContentClaim claim) throws IOException {
if (claim == null) {
return;
@@ -1905,78 +1881,38 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long newSize = 0L;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
ContentClaim newClaim = null;
final LongHolder writtenHolder = new LongHolder(0L);
- final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- claimOffset = currentWriteClaimSize;
- newClaim = currentWriteClaim;
- ensureNotAppending(newClaim);
-
- try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-
- recursionSet.add(source);
-
- writeRecursionLevel++;
- try {
- writer.process(new FlowFileAccessOutputStream(countingOut, source));
- } finally {
- writeRecursionLevel--;
- }
- } finally {
- recursionSet.remove(source);
- }
-
- final long writtenThisCall = writtenHolder.getValue();
- newSize = writtenThisCall;
- currentWriteClaimSize += newSize;
- } else {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
-
- ensureNotAppending(newClaim);
- try (final OutputStream stream = context.getContentRepository().write(newClaim);
- final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
- recursionSet.add(source);
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
- writeRecursionLevel++;
- try {
- writer.process(new FlowFileAccessOutputStream(countingOut, source));
- } finally {
- writeRecursionLevel--;
- }
- } finally {
- recursionSet.remove(source);
- }
- newSize = context.getContentRepository().size(newClaim);
+ ensureNotAppending(newClaim);
+ try (final OutputStream stream = context.getContentRepository().write(newClaim);
+ final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+ final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+ recursionSet.add(source);
+ writer.process(new FlowFileAccessOutputStream(countingOut, source));
+ } finally {
+ recursionSet.remove(source);
}
+ newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
handleContentNotFound(nfe, record);
} catch (final FlowFileAccessException ffae) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw ffae;
} catch (final IOException ioe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw t;
} finally {
@@ -2021,13 +1957,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
recursionSet.add(source);
-
- writeRecursionLevel++;
- try {
- writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
- } finally {
- writeRecursionLevel--;
- }
+ writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
} finally {
recursionSet.remove(source);
}
@@ -2039,13 +1969,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
recursionSet.add(source);
-
- writeRecursionLevel++;
- try {
- writer.process(disableOnClose);
- } finally {
- writeRecursionLevel--;
- }
+ writer.process(disableOnClose);
} finally {
recursionSet.remove(source);
}
@@ -2115,18 +2039,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void resetWriteClaims() {
- try {
- if (currentWriteClaimStream != null) {
- currentWriteClaimStream.flush();
- currentWriteClaimStream.close();
- }
- } catch (final Exception e) {
- }
- currentWriteClaimStream = null;
- currentWriteClaim = null;
- currentWriteClaimFlowFileCount = 0;
- currentWriteClaimSize = 0L;
-
for (final ByteCountingOutputStream out : appendableStreams.values()) {
try {
out.flush();
@@ -2148,15 +2060,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
currentReadClaim = null;
}
- /**
- * @return Indicates whether or not multiple FlowFiles should be merged into a single ContentClaim
- */
- private boolean isMergeContent() {
- if (writeRecursionLevel > 0) {
- return false;
- }
- return true;
- }
@Override
public FlowFile write(final FlowFile source, final StreamCallback writer) {
@@ -2166,116 +2069,58 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ContentClaim newClaim = null;
long newSize = 0L;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
final LongHolder writtenHolder = new LongHolder(0L);
- final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- claimOffset = currentWriteClaimSize;
- newClaim = currentWriteClaim;
- ensureNotAppending(newClaim);
-
- try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
- final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
- final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
- final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
- recursionSet.add(source);
+ ensureNotAppending(newClaim);
- // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
- // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
- // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
- // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
- // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
- final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
- boolean cnfeThrown = false;
-
- writeRecursionLevel++;
- try {
- writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
- } catch (final ContentNotFoundException cnfe) {
- cnfeThrown = true;
- throw cnfe;
- } finally {
- writeRecursionLevel--;
- recursionSet.remove(source);
+ try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
+ final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
+ final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+ final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
+ final OutputStream os = context.getContentRepository().write(newClaim);
+ final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+ final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
- // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
- if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
- throw ffais.getContentNotFoundException();
- }
- }
- }
-
- final long writtenThisCall = writtenHolder.getValue();
- newSize = writtenThisCall;
- currentWriteClaimSize += writtenThisCall;
- } else {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
-
- ensureNotAppending(newClaim);
-
- try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
- final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
- final OutputStream os = context.getContentRepository().write(newClaim);
- final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
+ recursionSet.add(source);
- recursionSet.add(source);
+ // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
+ // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
+ // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
+ // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
+ // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
+ final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
+ boolean cnfeThrown = false;
- // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
- // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
- // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
- // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
- // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
- final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
- boolean cnfeThrown = false;
-
- writeRecursionLevel++;
- try {
- writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
- } catch (final ContentNotFoundException cnfe) {
- cnfeThrown = true;
- throw cnfe;
- } finally {
- writeRecursionLevel--;
- recursionSet.remove(source);
+ try {
+ writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
+ } catch (final ContentNotFoundException cnfe) {
+ cnfeThrown = true;
+ throw cnfe;
+ } finally {
+ recursionSet.remove(source);
- // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
- if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
- throw ffais.getContentNotFoundException();
- }
+ // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
+ if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
+ throw ffais.getContentNotFoundException();
}
}
-
- newSize = context.getContentRepository().size(newClaim);
}
+
+ newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
handleContentNotFound(nfe, record);
} catch (final IOException ioe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final FlowFileAccessException ffae) {
- if (appendToClaim) {
- resetWriteClaims();
- }
destroyContent(newClaim);
throw ffae;
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
throw t;
} finally {
@@ -2302,33 +2147,20 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim newClaim;
final long claimOffset;
- final boolean appendToClaim = isMergeContent();
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- newClaim = currentWriteClaim;
- claimOffset = currentWriteClaimSize;
- } else {
- try {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
- } catch (final IOException e) {
- throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
- }
-
- claimOffset = 0L;
+ try {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
+ claimOffset = 0L;
long newSize = 0L;
try {
- final boolean append = isMergeContent();
- newSize = context.getContentRepository().importFrom(source, newClaim, append);
+ newSize = context.getContentRepository().importFrom(source, newClaim, false);
bytesWritten.increment(newSize);
bytesRead.increment(newSize);
- currentWriteClaimSize += newSize;
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims();
- }
destroyContent(newClaim);
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
}
@@ -2351,40 +2183,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
ContentClaim newClaim = null;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
final long newSize;
- final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- newClaim = currentWriteClaim;
- claimOffset = currentWriteClaimSize;
-
- final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
- bytesWritten.increment(bytesCopied);
- currentWriteClaimSize += bytesCopied;
- newSize = bytesCopied;
- } else {
- try {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+ try {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
- newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
- bytesWritten.increment(newSize);
- currentWriteClaimSize += newSize;
- } catch (final IOException e) {
- throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
- }
+ newSize = context.getContentRepository().importFrom(source, newClaim, false);
+ bytesWritten.increment(newSize);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims();
- }
-
if (newClaim != null) {
destroyContent(newClaim);
}
+
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..6c1626c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
import org.apache.nifi.controller.repository.io.MemoryManager;
@@ -92,7 +95,7 @@ public class VolatileContentRepository implements ContentRepository {
private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
public VolatileContentRepository() {
this(NiFiProperties.getInstance());
@@ -119,7 +122,7 @@ public class VolatileContentRepository implements ContentRepository {
}
@Override
- public void initialize(final ContentClaimManager claimManager) {
+ public void initialize(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
for (int i = 0; i < 3; i++) {
@@ -199,9 +202,10 @@ public class VolatileContentRepository implements ContentRepository {
private ContentClaim createLossTolerant() {
final long id = idGenerator.getAndIncrement();
- final ContentClaim claim = claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+ final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
- claimManager.incrementClaimantCount(claim, true);
+ claimManager.incrementClaimantCount(resourceClaim, true);
claimMap.put(claim, contentBlock);
@@ -216,7 +220,7 @@ public class VolatileContentRepository implements ContentRepository {
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.incrementClaimantCount(resolveClaim(claim));
+ return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().incrementClaimaintCount(backupClaim);
}
@@ -230,7 +234,7 @@ public class VolatileContentRepository implements ContentRepository {
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.decrementClaimantCount(resolveClaim(claim));
+ return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().decrementClaimantCount(backupClaim);
}
@@ -244,7 +248,7 @@ public class VolatileContentRepository implements ContentRepository {
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.getClaimantCount(resolveClaim(claim));
+ return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().getClaimantCount(backupClaim);
}
@@ -273,6 +277,29 @@ public class VolatileContentRepository implements ContentRepository {
return true;
}
+ private boolean remove(final ResourceClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ final Set<ContentClaim> contentClaims = new HashSet<>();
+ for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) {
+ final ContentClaim contentClaim = entry.getKey();
+ if (contentClaim.getResourceClaim().equals(claim)) {
+ contentClaims.add(contentClaim);
+ }
+ }
+
+ boolean removed = false;
+ for (final ContentClaim contentClaim : contentClaims) {
+ if (remove(contentClaim)) {
+ removed = true;
+ }
+ }
+
+ return removed;
+ }
+
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
final ContentClaim createdClaim = create(lossTolerant);
@@ -435,7 +462,7 @@ public class VolatileContentRepository implements ContentRepository {
@Override
public void purge() {
for (final ContentClaim claim : claimMap.keySet()) {
- claimManager.decrementClaimantCount(resolveClaim(claim));
+ claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
final ContentClaim backup = getBackupClaim(claim);
if (backup != null) {
getBackupRepository().remove(backup);
@@ -624,7 +651,7 @@ public class VolatileContentRepository implements ContentRepository {
@Override
public void run() {
- final List<ContentClaim> destructable = new ArrayList<>(1000);
+ final List<ResourceClaim> destructable = new ArrayList<>(1000);
while (true) {
destructable.clear();
claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS);
@@ -632,7 +659,7 @@ public class VolatileContentRepository implements ContentRepository {
return;
}
- for (final ContentClaim claim : destructable) {
+ for (final ResourceClaim claim : destructable) {
remove(claim);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9f32074c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index fe34fe0..a85b23b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* <p>
@@ -32,10 +34,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
public class VolatileFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
@Override
- public void initialize(final ContentClaimManager claimManager) {
+ public void initialize(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
}
@@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
public void close() throws IOException {
}
+ private void markDestructable(final ContentClaim contentClaim) {
+ if (contentClaim == null) {
+ return;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ return;
+ }
+
+ claimManager.markDestructable(resourceClaim);
+ }
+
+ private int getClaimantCount(final ContentClaim claim) {
+ if (claim == null) {
+ return 0;
+ }
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ if (resourceClaim == null) {
+ return 0;
+ }
+
+ return claimManager.getClaimantCount(resourceClaim);
+ }
+
@Override
public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
- if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
- claimManager.markDestructable(record.getCurrentClaim());
+ if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
+ markDestructable(record.getCurrentClaim());
}
// If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
- if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
- claimManager.markDestructable(record.getOriginalClaim());
+ if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
+ markDestructable(record.getOriginalClaim());
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed, mark original as destructable
- if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
- claimManager.markDestructable(record.getOriginalClaim());
+ if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
+ markDestructable(record.getOriginalClaim());
}
}
}