You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/29 21:59:57 UTC
[1/3] nifi git commit: NIFI-744: Refactored ContentClaim into
ContentClaim and ResourceClaim so that we can append to a single file in the
FileSystemRepository even after a session is completed
Repository: nifi
Updated Branches:
refs/heads/NIFI-744 [created] 50379fcc0
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index f2df821..5ee5fb5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -43,11 +43,12 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
@@ -86,7 +87,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// effectively final
private WriteAheadRepository<RepositoryRecord> wal;
private WriteAheadRecordSerde serde;
- private ContentClaimManager claimManager;
+ private ResourceClaimManager claimManager;
// WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
// We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been
@@ -125,7 +126,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
@Override
- public void initialize(final ContentClaimManager claimManager) throws IOException {
+ public void initialize(final ResourceClaimManager claimManager) throws IOException {
this.claimManager = claimManager;
Files.createDirectories(flowFileRepositoryPath);
@@ -168,6 +169,32 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
updateRepository(records, alwaysSync);
}
+ private void markDestructable(final ContentClaim contentClaim) {
+ if (contentClaim == null) {
+ return;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ return;
+ }
+
+ claimManager.markDestructable(resourceClaim);
+ }
+
+ private int getClaimantCount(final ContentClaim claim) {
+ if (claim == null) {
+ return 0;
+ }
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ if (resourceClaim == null) {
+ return 0;
+ }
+
+ return claimManager.getClaimantCount(resourceClaim);
+ }
+
private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
for (final RepositoryRecord record : records) {
if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING && record.getDestination() == null) {
@@ -190,17 +217,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
- if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
+ if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
claimsToAdd.add(record.getCurrentClaim());
}
// If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
- if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+ if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
claimsToAdd.add(record.getOriginalClaim());
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed, mark original as destructable
- if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+ if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
claimsToAdd.add(record.getOriginalClaim());
}
}
@@ -212,7 +239,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey);
if (claimQueue == null) {
claimQueue = new LinkedBlockingQueue<>();
- BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+ final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
if (existingClaimQueue != null) {
claimQueue = existingClaimQueue;
}
@@ -222,9 +249,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
}
- private void markDestructable(final ContentClaim claim) {
- claimManager.markDestructable(claim);
- }
@Override
public void onSync(final int partitionIndex) {
@@ -307,7 +331,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
for (final RepositoryRecord record : recordList) {
final ContentClaim claim = record.getCurrentClaim();
if (claim != null) {
- claimManager.incrementClaimantCount(claim);
+ claimManager.incrementClaimantCount(claim.getResourceClaim());
}
}
@@ -339,7 +363,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final long start = System.nanoTime();
final int numRecordsCheckpointed = checkpoint();
final long end = System.nanoTime();
- final long millis = TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS);
+ final long millis = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS);
logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds",
new Object[]{numRecordsCheckpointed, millis});
} catch (final IOException e) {
@@ -378,9 +402,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private Map<String, FlowFileQueue> flowFileQueueMap = null;
private long recordsRestored = 0L;
- private final ContentClaimManager claimManager;
+ private final ResourceClaimManager claimManager;
- public WriteAheadRecordSerde(final ContentClaimManager claimManager) {
+ public WriteAheadRecordSerde(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
}
@@ -518,7 +542,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- RepositoryRecord record = currentRecordStates.get(recordId);
+ final RepositoryRecord record = currentRecordStates.get(recordId);
ffBuilder.id(recordId);
if (record != null) {
ffBuilder.fromFlowFile(record.getCurrent());
@@ -705,11 +729,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
out.write(0);
} else {
out.write(1);
- writeString(claim.getId(), out);
- writeString(claim.getContainer(), out);
- writeString(claim.getSection(), out);
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ writeString(resourceClaim.getId(), out);
+ writeString(resourceClaim.getContainer(), out);
+ writeString(resourceClaim.getSection(), out);
+ out.writeLong(claim.getOffset());
+ out.writeLong(claim.getLength());
+
out.writeLong(offset);
- out.writeBoolean(claim.isLossTolerant());
+ out.writeBoolean(resourceClaim.isLossTolerant());
}
}
@@ -726,6 +755,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final String container = readString(in);
final String section = readString(in);
+
+ final long resourceOffset;
+ final long resourceLength;
+ if (serializationVersion < 7) {
+ resourceOffset = 0L;
+ resourceLength = -1L;
+ } else {
+ resourceOffset = in.readLong();
+ resourceLength = in.readLong();
+ }
+
final long claimOffset = in.readLong();
final boolean lossTolerant;
@@ -735,8 +775,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lossTolerant = false;
}
- final ContentClaim existingClaim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
- ffBuilder.contentClaim(existingClaim);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+ final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+ contentClaim.setLength(resourceLength);
+
+ ffBuilder.contentClaim(contentClaim);
ffBuilder.contentClaimOffset(claimOffset);
} else if (claimExists == -1) {
throw new EOFException();
@@ -785,16 +828,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
throw new EOFException();
}
if (firstValue == 0xff && secondValue == 0xff) {
- int ch1 = in.read();
- int ch2 = in.read();
- int ch3 = in.read();
- int ch4 = in.read();
+ final int ch1 = in.read();
+ final int ch2 = in.read();
+ final int ch3 = in.read();
+ final int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) {
throw new EOFException();
}
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
} else {
- return ((firstValue << 8) + (secondValue));
+ return (firstValue << 8) + secondValue;
}
}
@@ -834,7 +877,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override
public int getVersion() {
- return 6;
+ return 7;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index a8a6963..753e818 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.controller.repository.claim;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>
@@ -28,115 +27,89 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
- private final String id;
- private final String container;
- private final String section;
- private final boolean lossTolerant;
- private final AtomicInteger claimantCount = new AtomicInteger(0);
+ private final ResourceClaim resourceClaim;
private final int hashCode;
-
- StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
- this.container = container.intern();
- this.section = section.intern();
- this.id = id;
- this.lossTolerant = lossTolerant;
-
- hashCode = (int) (17 + 19 * (id.hashCode()) + 19 * container.hashCode() + 19 * section.hashCode());
+ private volatile long offset;
+ private volatile long length;
+
+ public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
+ this.resourceClaim = resourceClaim;
+ this.offset = offset;
+ this.length = -1L;
+ this.hashCode = calculateHashCode();
}
- @Override
- public boolean isLossTolerant() {
- return lossTolerant;
+ public void setLength(final long length) {
+ this.length = length;
}
- /**
- * @return the unique identifier for this claim
- */
- @Override
- public String getId() {
- return id;
+ public void setOffset(final long offset) {
+ this.offset = offset;
}
- /**
- * @return the container identifier in which this claim is held
- */
- @Override
- public String getContainer() {
- return container;
+ private int calculateHashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + hashCode;
+ result = prime * result + (int) (length ^ length >>> 32);
+ result = prime * result + (int) (offset ^ offset >>> 32);
+ result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
+ return result;
}
- /**
- * @return the section within a given container the claim is held
- */
@Override
- public String getSection() {
- return section;
- }
-
- int getClaimantCount() {
- return claimantCount.get();
- }
-
- int decrementClaimantCount() {
- return claimantCount.decrementAndGet();
- }
-
- int incrementClaimantCount() {
- return claimantCount.incrementAndGet();
+ public int hashCode() {
+ return this.hashCode;
}
- /**
- * Provides the natural ordering for ContentClaim objects. By default they are sorted by their id, then container, then section
- *
- * @param other other claim
- * @return x such that x <=1 if this is less than other;
- * x=0 if this.equals(other);
- * x >= 1 if this is greater than other
- */
@Override
- public int compareTo(final ContentClaim other) {
- final int idComparison = id.compareTo(other.getId());
- if (idComparison != 0) {
- return idComparison;
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
}
- final int containerComparison = container.compareTo(other.getContainer());
- if (containerComparison != 0) {
- return containerComparison;
+ if (obj == null) {
+ return false;
}
- return section.compareTo(other.getSection());
- }
-
- @Override
- public boolean equals(final Object other) {
- if (this == other) {
- return true;
+ if (!(obj instanceof ContentClaim)) {
+ return false;
}
- if (other == null) {
+ final ContentClaim other = (ContentClaim) obj;
+ if (length != other.getLength()) {
return false;
}
- if (hashCode != other.hashCode()) {
- // We check hash code before instanceof because instanceof is fairly expensive and for
- // StandardContentClaim, calling hashCode() simply returns a pre-calculated value.
+
+ if (offset != other.getOffset()) {
return false;
}
- if (!(other instanceof ContentClaim)) {
- return false;
+ return resourceClaim.equals(other.getResourceClaim());
+ }
+
+ @Override
+ public int compareTo(final ContentClaim o) {
+ final int resourceComp = resourceClaim.compareTo(o.getResourceClaim());
+ if (resourceComp != 0) {
+ return resourceComp;
}
- final ContentClaim otherClaim = (ContentClaim) other;
- return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection());
+
+ return Long.compare(offset, o.getOffset());
}
@Override
- public int hashCode() {
- return hashCode;
+ public ResourceClaim getResourceClaim() {
+ return resourceClaim;
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
}
@Override
- public String toString() {
- return "ContentClaim[id=" + id + ", container=" + container + ", section=" + section + "]";
+ public long getLength() {
+ return length;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
deleted file mode 100644
index b68f95e..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StandardContentClaimManager implements ContentClaimManager {
-
- private static final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
- private static final Logger logger = LoggerFactory.getLogger(StandardContentClaimManager.class);
-
- private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
-
- @Override
- public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
- return new StandardContentClaim(container, section, id, lossTolerant);
- }
-
- private static AtomicInteger getCounter(final ContentClaim claim) {
- if (claim == null) {
- return null;
- }
-
- AtomicInteger counter = claimantCounts.get(claim);
- if (counter != null) {
- return counter;
- }
-
- counter = new AtomicInteger(0);
- AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
- return (existingCounter == null) ? counter : existingCounter;
- }
-
- @Override
- public int getClaimantCount(final ContentClaim claim) {
- if (claim == null) {
- return 0;
- }
- final AtomicInteger counter = claimantCounts.get(claim);
- return (counter == null) ? 0 : counter.get();
- }
-
- @Override
- public int decrementClaimantCount(final ContentClaim claim) {
- if (claim == null) {
- return 0;
- }
-
- final AtomicInteger counter = claimantCounts.get(claim);
- if (counter == null) {
- logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
- return -1;
- }
-
- final int newClaimantCount = counter.decrementAndGet();
- logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
- if (newClaimantCount == 0) {
- claimantCounts.remove(claim);
- }
- return newClaimantCount;
- }
-
- @Override
- public int incrementClaimantCount(final ContentClaim claim) {
- return incrementClaimantCount(claim, false);
- }
-
- @Override
- public int incrementClaimantCount(final ContentClaim claim, final boolean newClaim) {
- final AtomicInteger counter = getCounter(claim);
-
- final int newClaimantCount = counter.incrementAndGet();
- logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount);
- // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims.
- if (!newClaim && newClaimantCount == 1) {
- destructableClaims.remove(claim);
- }
- return newClaimantCount;
- }
-
- @Override
- public void markDestructable(final ContentClaim claim) {
- if (claim == null) {
- return;
- }
-
- if (getClaimantCount(claim) > 0) {
- return;
- }
-
- logger.debug("Marking claim {} as destructable", claim);
- try {
- while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
- }
- } catch (final InterruptedException ie) {
- }
- }
-
- @Override
- public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements) {
- final int drainedCount = destructableClaims.drainTo(destination, maxElements);
- logger.debug("Drained {} destructable claims to {}", drainedCount, destination);
- }
-
- @Override
- public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) {
- try {
- final ContentClaim firstClaim = destructableClaims.poll(timeout, unit);
- if (firstClaim != null) {
- destination.add(firstClaim);
- destructableClaims.drainTo(destination, maxElements - 1);
- }
- } catch (final InterruptedException e) {
- }
- }
-
- @Override
- public void purge() {
- claimantCounts.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
new file mode 100644
index 0000000..bd3ed5a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> {
+ private final String id;
+ private final String container;
+ private final String section;
+ private final boolean lossTolerant;
+ private final AtomicInteger claimantCount = new AtomicInteger(0);
+ private final int hashCode;
+
+ public StandardResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
+ this.container = container.intern();
+ this.section = section.intern();
+ this.id = id;
+ this.lossTolerant = lossTolerant;
+
+ hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * section.hashCode();
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return lossTolerant;
+ }
+
+ /**
+ * @return the unique identifier for this claim
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @return the container identifier in which this claim is held
+ */
+ @Override
+ public String getContainer() {
+ return container;
+ }
+
+ /**
+ * @return the section within a given container the claim is held
+ */
+ @Override
+ public String getSection() {
+ return section;
+ }
+
+ int getClaimantCount() {
+ return claimantCount.get();
+ }
+
+ int decrementClaimantCount() {
+ return claimantCount.decrementAndGet();
+ }
+
+ int incrementClaimantCount() {
+ return claimantCount.incrementAndGet();
+ }
+
+ /**
+ * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
+ *
+ * @param other other claim
+ * @return x such that x <=1 if this is less than other;
+ * x=0 if this.equals(other);
+ * x >= 1 if this is greater than other
+ */
+ @Override
+ public int compareTo(final ResourceClaim other) {
+ final int idComparison = id.compareTo(other.getId());
+ if (idComparison != 0) {
+ return idComparison;
+ }
+
+ final int containerComparison = container.compareTo(other.getContainer());
+ if (containerComparison != 0) {
+ return containerComparison;
+ }
+
+ return section.compareTo(other.getSection());
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null) {
+ return false;
+ }
+ if (hashCode != other.hashCode()) {
+ // We check hash code before instanceof because instanceof is fairly expensive and for
+ // StandardResourceClaim, calling hashCode() simply returns a pre-calculated value.
+ return false;
+ }
+
+ if (!(other instanceof ResourceClaim)) {
+ return false;
+ }
+ final ResourceClaim otherClaim = (ResourceClaim) other;
+ return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection());
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
new file mode 100644
index 0000000..4826ac3
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardResourceClaimManager implements ResourceClaimManager {
+
+ private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
+
+ private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
+
+ @Override
+ public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
+ return new StandardResourceClaim(container, section, id, lossTolerant);
+ }
+
+ private static AtomicInteger getCounter(final ResourceClaim claim) {
+ if (claim == null) {
+ return null;
+ }
+
+ AtomicInteger counter = claimantCounts.get(claim);
+ if (counter != null) {
+ return counter;
+ }
+
+ counter = new AtomicInteger(0);
+ final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
+ return existingCounter == null ? counter : existingCounter;
+ }
+
+ @Override
+ public int getClaimantCount(final ResourceClaim claim) {
+ if (claim == null) {
+ return 0;
+ }
+ final AtomicInteger counter = claimantCounts.get(claim);
+ return counter == null ? 0 : counter.get();
+ }
+
+ @Override
+ public int decrementClaimantCount(final ResourceClaim claim) {
+ if (claim == null) {
+ return 0;
+ }
+
+ final AtomicInteger counter = claimantCounts.get(claim);
+ if (counter == null) {
+ logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
+ return -1;
+ }
+
+ final int newClaimantCount = counter.decrementAndGet();
+ logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
+ if (newClaimantCount == 0) {
+ claimantCounts.remove(claim);
+ }
+ return newClaimantCount;
+ }
+
+ @Override
+ public int incrementClaimantCount(final ResourceClaim claim) {
+ return incrementClaimantCount(claim, false);
+ }
+
+ @Override
+ public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) {
+ final AtomicInteger counter = getCounter(claim);
+
+ final int newClaimantCount = counter.incrementAndGet();
+ logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount);
+ // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims.
+ if (!newClaim && newClaimantCount == 1) {
+ destructableClaims.remove(claim);
+ }
+ return newClaimantCount;
+ }
+
+ @Override
+ public void markDestructable(final ResourceClaim claim) {
+ if (claim == null) {
+ return;
+ }
+
+ if (getClaimantCount(claim) > 0) {
+ return;
+ }
+
+ logger.debug("Marking claim {} as destructable", claim);
+ try {
+ while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
+ }
+ } catch (final InterruptedException ie) {
+ }
+ }
+
+ @Override
+ public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) {
+ final int drainedCount = destructableClaims.drainTo(destination, maxElements);
+ logger.debug("Drained {} destructable claims to {}", drainedCount, destination);
+ }
+
+ @Override
+ public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) {
+ try {
+ final ResourceClaim firstClaim = destructableClaims.poll(timeout, unit);
+ if (firstClaim != null) {
+ destination.add(firstClaim);
+ destructableClaims.drainTo(destination, maxElements - 1);
+ }
+ } catch (final InterruptedException e) {
+ }
+ }
+
+ @Override
+ public void purge() {
+ claimantCounts.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 364dcad..a17bd40 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -30,8 +30,8 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.junit.Test;
import org.mockito.Mockito;
@@ -48,7 +48,7 @@ public class TestFileSystemSwapManager {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopContentClaimManager());
+ final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopResourceClaimManager());
assertEquals(10000, records.size());
for (final FlowFileRecord record : records) {
@@ -58,43 +58,43 @@ public class TestFileSystemSwapManager {
}
}
- public class NopContentClaimManager implements ContentClaimManager {
+ public class NopResourceClaimManager implements ResourceClaimManager {
@Override
- public ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant) {
+ public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) {
return null;
}
@Override
- public int getClaimantCount(ContentClaim claim) {
+ public int getClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
- public int decrementClaimantCount(ContentClaim claim) {
+ public int decrementClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
- public int incrementClaimantCount(ContentClaim claim) {
+ public int incrementClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
- public int incrementClaimantCount(ContentClaim claim, boolean newClaim) {
+ public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) {
return 0;
}
@Override
- public void markDestructable(ContentClaim claim) {
+ public void markDestructable(ResourceClaim claim) {
}
@Override
- public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements) {
+ public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) {
}
@Override
- public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit) {
+ public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) {
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index ada0775..519ba9c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -16,11 +16,10 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.FileSystemRepository;
-import org.apache.nifi.controller.repository.ContentNotFoundException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -37,12 +36,15 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
-
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,20 +55,25 @@ public class TestFileSystemRepository {
public static final File helloWorldFile = new File("src/test/resources/hello.txt");
private FileSystemRepository repository = null;
+ private final File rootFile = new File("target/content_repository");
@Before
public void setup() throws IOException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
- final File repo = new File("target/content_repository");
- if (repo.exists()) {
- DiskUtils.deleteRecursively(repo);
+ if (rootFile.exists()) {
+ DiskUtils.deleteRecursively(rootFile);
}
repository = new FileSystemRepository();
- repository.initialize(new StandardContentClaimManager());
+ repository.initialize(new StandardResourceClaimManager());
repository.purge();
}
+ @After
+ public void shutdown() throws IOException {
+ repository.shutdown();
+ }
+
@Test
public void testCreateContentClaim() throws IOException {
// value passed to #create is irrelevant because the FileSystemRepository does not currently support loss tolerance.
@@ -98,6 +105,127 @@ public class TestFileSystemRepository {
}
@Test
+ public void testResourceClaimReused() throws IOException {
+ final ContentClaim claim1 = repository.create(false);
+ final ContentClaim claim2 = repository.create(false);
+
+ // should not be equal because claim1 may still be in use
+ assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+
+ try (final OutputStream out = repository.write(claim1)) {
+ }
+
+ final ContentClaim claim3 = repository.create(false);
+ assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+ }
+
+ @Test
+ public void testResourceClaimNotReusedAfterRestart() throws IOException, InterruptedException {
+ final ContentClaim claim1 = repository.create(false);
+ try (final OutputStream out = repository.write(claim1)) {
+ }
+
+ repository.shutdown();
+ Thread.sleep(1000L);
+
+ repository = new FileSystemRepository();
+ repository.initialize(new StandardResourceClaimManager());
+ repository.purge();
+
+ final ContentClaim claim2 = repository.create(false);
+ assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+ }
+
+ @Test
+ public void testRewriteContentClaim() throws IOException {
+ final ContentClaim claim1 = repository.create(false);
+ assertEquals(1, repository.getClaimantCount(claim1));
+
+ try (final OutputStream out = repository.write(claim1)) {
+ out.write("abc".getBytes());
+ }
+ assertEquals(1, repository.getClaimantCount(claim1));
+
+ try (final OutputStream out = repository.write(claim1)) {
+ out.write("cba".getBytes());
+ }
+ assertEquals(1, repository.getClaimantCount(claim1));
+
+ try (final InputStream in = repository.read(claim1)) {
+ assertEquals('c', in.read());
+ assertEquals('b', in.read());
+ assertEquals('a', in.read());
+ }
+ assertEquals(1, repository.getClaimantCount(claim1));
+
+ assertEquals(3, repository.size(claim1));
+
+ final byte[] oneMB = new byte[1024 * 1024 - 6];
+ new Random().nextBytes(oneMB);
+ try (final OutputStream out = repository.write(claim1)) {
+ out.write(oneMB);
+ }
+ assertEquals(1, repository.getClaimantCount(claim1));
+
+ assertEquals(1024 * 1024 - 6, repository.size(claim1));
+ try (final InputStream in = repository.read(claim1)) {
+ final byte[] buff = new byte[oneMB.length];
+ StreamUtils.fillBuffer(in, buff);
+ assertTrue(Arrays.equals(buff, oneMB));
+ }
+
+ final ResourceClaim resourceClaim = claim1.getResourceClaim();
+ final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
+ assertTrue(Files.exists(path));
+ assertEquals(0, repository.decrementClaimantCount(claim1));
+ assertTrue(repository.remove(claim1));
+ assertFalse(Files.exists(path));
+
+ final ContentClaim claim2 = repository.create(false);
+ assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+ }
+
+ @Test
+ public void testWriteWithNoContent() throws IOException {
+ final ContentClaim claim1 = repository.create(false);
+ try (final OutputStream out = repository.write(claim1)) {
+ out.write("Hello".getBytes());
+ }
+
+ final ContentClaim claim2 = repository.create(false);
+ assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim());
+ try (final OutputStream out = repository.write(claim2)) {
+
+ }
+
+ final ContentClaim claim3 = repository.create(false);
+ assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+ try (final OutputStream out = repository.write(claim3)) {
+ out.write(" World".getBytes());
+ }
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final InputStream in = repository.read(claim1)) {
+ StreamUtils.copy(in, baos);
+ }
+
+ assertEquals("Hello", baos.toString());
+
+ baos.reset();
+ try (final InputStream in = repository.read(claim2)) {
+ StreamUtils.copy(in, baos);
+ }
+ assertEquals("", baos.toString());
+ assertEquals(0, baos.size());
+
+ baos.reset();
+ try (final InputStream in = repository.read(claim3)) {
+ StreamUtils.copy(in, baos);
+ }
+ assertEquals(" World", baos.toString());
+ }
+
+ @Test
public void testRemoveDeletesFileIfNoClaimants() throws IOException {
final ContentClaim claim = repository.create(true);
assertNotNull(claim);
@@ -155,6 +283,40 @@ public class TestFileSystemRepository {
final byte[] data = Files.readAllBytes(path);
final byte[] expected = Files.readAllBytes(testFile.toPath());
assertTrue(Arrays.equals(expected, data));
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final InputStream in = repository.read(claim)) {
+ StreamUtils.copy(in, baos);
+ }
+
+ assertTrue(Arrays.equals(expected, baos.toByteArray()));
+ }
+
+ @Test
+ public void testImportFromFileWithAppend() throws IOException {
+ final ContentClaim claim = repository.create(false);
+ final File hello = new File("src/test/resources/hello.txt");
+ final File goodbye = new File("src/test/resources/bye.txt");
+
+ repository.importFrom(hello.toPath(), claim, true);
+ assertContentEquals(claim, "Hello, World");
+
+ repository.importFrom(goodbye.toPath(), claim, true);
+ assertContentEquals(claim, "Hello, WorldGood-Bye, World!");
+
+ repository.importFrom(hello.toPath(), claim, true);
+ assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World");
+
+ repository.importFrom(goodbye.toPath(), claim, false);
+ assertContentEquals(claim, "Good-Bye, World!");
+ }
+
+ private void assertContentEquals(final ContentClaim claim, final String expected) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final InputStream in = repository.read(claim)) {
+ StreamUtils.copy(in, baos);
+ }
+ assertEquals(expected, new String(baos.toByteArray()));
}
@Test
@@ -314,9 +476,9 @@ public class TestFileSystemRepository {
}
final ContentClaim destination = repository.create(true);
- final byte[] headerBytes = (header == null) ? null : header.getBytes();
- final byte[] footerBytes = (footer == null) ? null : footer.getBytes();
- final byte[] demarcatorBytes = (demarcator == null) ? null : demarcator.getBytes();
+ final byte[] headerBytes = header == null ? null : header.getBytes();
+ final byte[] footerBytes = footer == null ? null : footer.getBytes();
+ final byte[] demarcatorBytes = demarcator == null ? null : demarcator.getBytes();
repository.merge(claims, destination, headerBytes, footerBytes, demarcatorBytes);
final StringBuilder sb = new StringBuilder();
@@ -334,8 +496,12 @@ public class TestFileSystemRepository {
}
final String expectedText = sb.toString();
final byte[] expected = expectedText.getBytes();
- final Path path = getPath(destination);
- final byte[] actual = Files.readAllBytes(path);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) destination.getLength());
+ try (final InputStream in = repository.read(destination)) {
+ StreamUtils.copy(in, baos);
+ }
+ final byte[] actual = baos.toByteArray();
assertTrue(Arrays.equals(expected, actual));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 3486875..b1fd4c7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -51,8 +51,11 @@ import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.Relationship;
@@ -179,7 +182,7 @@ public class TestStandardProcessSession {
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
contentRepo = new MockContentRepository();
- contentRepo.initialize(new StandardContentClaimManager());
+ contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository();
final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
@@ -194,10 +197,10 @@ public class TestStandardProcessSession {
assertEquals(1, contentRepo.getExistingClaims().size());
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .contentClaim(claim)
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
@@ -295,10 +298,10 @@ public class TestStandardProcessSession {
public void testAppendAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .contentClaim(claim)
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
@@ -316,12 +319,12 @@ public class TestStandardProcessSession {
public void testReadAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .contentClaim(claim)
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
- FlowFile flowFile = session.get();
+ final FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
session.read(flowFile, new InputStreamCallback() {
@@ -337,10 +340,10 @@ public class TestStandardProcessSession {
public void testStreamAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .contentClaim(claim)
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
@@ -361,10 +364,10 @@ public class TestStandardProcessSession {
public void testWriteAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .contentClaim(claim)
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
@@ -382,9 +385,9 @@ public class TestStandardProcessSession {
public void testCreateThenRollbackRemovesContent() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
final StreamCallback nop = new StreamCallback() {
@@ -402,7 +405,7 @@ public class TestStandardProcessSession {
session.write(flowFile2, nop);
- FlowFile flowFile3 = session.create();
+ final FlowFile flowFile3 = session.create();
session.write(flowFile3, nop);
session.rollback();
@@ -412,14 +415,14 @@ public class TestStandardProcessSession {
@Test
public void testForksNotEmittedIfFilesDeleted() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
- FlowFile orig = session.get();
- FlowFile newFlowFile = session.create(orig);
+ final FlowFile orig = session.get();
+ final FlowFile newFlowFile = session.create(orig);
session.remove(newFlowFile);
session.commit();
@@ -429,14 +432,14 @@ public class TestStandardProcessSession {
@Test
public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
- FlowFile orig = session.get();
- FlowFile newFlowFile = session.create(orig);
+ final FlowFile orig = session.get();
+ final FlowFile newFlowFile = session.create(orig);
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
@@ -446,15 +449,15 @@ public class TestStandardProcessSession {
@Test
public void testProvenanceEventsEmittedForRemove() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
- FlowFile orig = session.get();
- FlowFile newFlowFile = session.create(orig);
- FlowFile secondNewFlowFile = session.create(orig);
+ final FlowFile orig = session.get();
+ final FlowFile newFlowFile = session.create(orig);
+ final FlowFile secondNewFlowFile = session.create(orig);
session.remove(newFlowFile);
session.transfer(secondNewFlowFile, new Relationship.Builder().name("A").build());
session.commit();
@@ -465,16 +468,16 @@ public class TestStandardProcessSession {
@Test
public void testUpdateAttributesThenJoin() throws IOException {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
- .entryDate(System.currentTimeMillis())
- .build();
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .entryDate(System.currentTimeMillis())
+ .build();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .id(2L)
- .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
- .entryDate(System.currentTimeMillis())
- .build();
+ .id(2L)
+ .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord1);
flowFileQueue.put(flowFileRecord2);
@@ -538,17 +541,17 @@ public class TestStandardProcessSession {
@Test
public void testForkOneToOneReported() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
flowFileQueue.put(flowFileRecord);
// we have to increment the ID generator because we are creating a FlowFile without the FlowFile Repository's knowledge
flowFileRepo.idGenerator.getAndIncrement();
- FlowFile orig = session.get();
- FlowFile newFlowFile = session.create(orig);
+ final FlowFile orig = session.get();
+ final FlowFile newFlowFile = session.create(orig);
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.getProvenanceReporter().fork(newFlowFile, Collections.singleton(orig));
session.remove(orig);
@@ -566,7 +569,7 @@ public class TestStandardProcessSession {
@Test
public void testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() {
- FlowFile ff1 = session.create();
+ final FlowFile ff1 = session.create();
final RuntimeException runtime = new RuntimeException();
try {
@@ -610,7 +613,7 @@ public class TestStandardProcessSession {
@Test
public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() {
- FlowFile ff1 = session.create();
+ final FlowFile ff1 = session.create();
final RuntimeException runtime = new RuntimeException();
try {
@@ -655,41 +658,16 @@ public class TestStandardProcessSession {
@Test
public void testMissingFlowFileExceptionThrownWhenUnableToReadData() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "x";
- }
-
- @Override
- public String getSection() {
- return "x";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .size(1L)
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+ .size(1L)
+ .build();
flowFileQueue.put(flowFileRecord);
// attempt to read the data.
try {
- FlowFile ff1 = session.get();
+ final FlowFile ff1 = session.get();
session.read(ff1, new InputStreamCallback() {
@Override
@@ -704,41 +682,16 @@ public class TestStandardProcessSession {
@Test
public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "x";
- }
-
- @Override
- public String getSection() {
- return "x";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .size(1L)
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+ .size(1L)
+ .build();
flowFileQueue.put(flowFileRecord);
// attempt to read the data.
try {
- FlowFile ff1 = session.get();
+ final FlowFile ff1 = session.get();
session.write(ff1, new StreamCallback() {
@Override
@@ -753,35 +706,10 @@ public class TestStandardProcessSession {
@Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+ .build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@@ -794,43 +722,18 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .contentClaimOffset(1000L)
- .size(1000L)
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+ .contentClaimOffset(1000L)
+ .size(1000L)
+ .build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
try {
session.get();
- FlowFile ff2 = session.get();
+ final FlowFile ff2 = session.get();
session.write(ff2, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
@@ -844,34 +747,11 @@ public class TestStandardProcessSession {
@Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
+ .build();
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- }).build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@@ -884,41 +764,17 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .contentClaimOffset(1000L).size(1L).build();
+ .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
try {
session.get();
- FlowFile ff2 = session.get();
+ final FlowFile ff2 = session.get();
session.read(ff2, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
@@ -931,7 +787,7 @@ public class TestStandardProcessSession {
@Test
public void testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() {
- FlowFile ff1 = session.create();
+ final FlowFile ff1 = session.create();
final RuntimeException runtime = new RuntimeException();
try {
@@ -975,7 +831,7 @@ public class TestStandardProcessSession {
@Test
public void testCreateEmitted() throws IOException {
- FlowFile newFlowFile = session.create();
+ final FlowFile newFlowFile = session.create();
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
@@ -1009,9 +865,9 @@ public class TestStandardProcessSession {
@Test
public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
- .build();
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
@@ -1035,9 +891,9 @@ public class TestStandardProcessSession {
@Test
public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
- .build();
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
@@ -1104,7 +960,7 @@ public class TestStandardProcessSession {
}
@Override
- public void initialize(ContentClaimManager claimManager) throws IOException {
+ public void initialize(ResourceClaimManager claimManager) throws IOException {
}
}
@@ -1112,9 +968,9 @@ public class TestStandardProcessSession {
private final AtomicLong idGenerator = new AtomicLong(0L);
private final AtomicLong claimsRemoved = new AtomicLong(0L);
- private ContentClaimManager claimManager;
+ private ResourceClaimManager claimManager;
- private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
@Override
public void shutdown() {
@@ -1124,9 +980,10 @@ public class TestStandardProcessSession {
final Set<ContentClaim> claims = new HashSet<>();
for (long i = 0; i < idGenerator.get(); i++) {
- final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(i), false);
- if (getClaimantCount(claim) > 0) {
- claims.add(claim);
+ final ResourceClaim resourceClaim = new StandardResourceClaim("container", "section", String.valueOf(i), false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+ if (getClaimantCount(contentClaim) > 0) {
+ claims.add(contentClaim);
}
}
@@ -1135,15 +992,17 @@ public class TestStandardProcessSession {
@Override
public ContentClaim create(boolean lossTolerant) throws IOException {
- final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
- claimantCounts.put(claim, new AtomicInteger(1));
- final Path path = getPath(claim);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+
+ claimantCounts.put(contentClaim, new AtomicInteger(1));
+ final Path path = getPath(contentClaim);
final Path parent = path.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
- Files.createFile(getPath(claim));
- return claim;
+ Files.createFile(getPath(contentClaim));
+ return contentClaim;
}
@Override
@@ -1219,7 +1078,8 @@ public class TestStandardProcessSession {
return 0;
}
- private Path getPath(final ContentClaim claim) {
+ private Path getPath(final ContentClaim contentClaim) {
+ final ResourceClaim claim = contentClaim.getResourceClaim();
return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
}
@@ -1315,7 +1175,7 @@ public class TestStandardProcessSession {
}
@Override
- public void initialize(ContentClaimManager claimManager) throws IOException {
+ public void initialize(ResourceClaimManager claimManager) throws IOException {
this.claimManager = claimManager;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index a32f321..5733164 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.VolatileContentRepository;
+
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
@@ -29,10 +30,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.util.NiFiProperties;
-
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,11 +43,11 @@ import org.mockito.Mockito;
public class TestVolatileContentRepository {
- private ContentClaimManager claimManager;
+ private ResourceClaimManager claimManager;
@Before
public void setup() {
- claimManager = new StandardContentClaimManager();
+ claimManager = new StandardResourceClaimManager();
}
@Test
@@ -81,8 +83,9 @@ public class TestVolatileContentRepository {
final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
contentRepo.setBackupRepository(mockRepo);
- final ContentClaim newClaim = claimManager.newContentClaim("container", "section", "1000", true);
- Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(newClaim);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+ Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
final ByteArrayOutputStream overflowStream = new ByteArrayOutputStream();
Mockito.when(mockRepo.write(Matchers.any(ContentClaim.class))).thenReturn(overflowStream);
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 054ef5e..2138928 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -34,7 +34,7 @@ import java.util.List;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Test;
@@ -53,7 +53,7 @@ public class TestWriteAheadFlowFileRepository {
}
final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository();
- repo.initialize(new StandardContentClaimManager());
+ repo.initialize(new StandardResourceClaimManager());
final List<Connection> connectionList = new ArrayList<>();
final QueueProvider queueProvider = new QueueProvider() {
@@ -119,7 +119,7 @@ public class TestWriteAheadFlowFileRepository {
// restore
final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository();
- repo2.initialize(new StandardContentClaimManager());
+ repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider, 0L);
assertEquals(1, flowFileCollection.size());
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
new file mode 100644
index 0000000..64926ce
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/bye.txt
@@ -0,0 +1 @@
+Good-Bye, World!
\ No newline at end of file
[3/3] nifi git commit: NIFI-744: Refactored ContentClaim into
ContentClaim and ResourceClaim so that we can append to a single file in the
FileSystemRepository even after a session is completed
Posted by ma...@apache.org.
NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim so that we can append to a single file in the FileSystemRepository even after a session is completed
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/50379fcc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/50379fcc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/50379fcc
Branch: refs/heads/NIFI-744
Commit: 50379fcc06347f413f14615f710cda6d02a6c8ce
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 15:59:25 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 29 15:59:25 2015 -0400
----------------------------------------------------------------------
.../repository/ContentRepository.java | 4 +-
.../repository/FlowFileRepository.java | 4 +-
.../repository/FlowFileSwapManager.java | 6 +-
.../repository/claim/ContentClaim.java | 35 +-
.../repository/claim/ContentClaimManager.java | 142 -----
.../repository/claim/ResourceClaim.java | 54 ++
.../repository/claim/ResourceClaimManager.java | 135 +++++
.../stream/io/ByteCountingOutputStream.java | 4 +
.../nifi/controller/FileSystemSwapManager.java | 63 ++-
.../apache/nifi/controller/FlowController.java | 167 +++---
.../repository/FileSystemRepository.java | 566 ++++++++++++++-----
.../repository/StandardFlowFileRecord.java | 6 +-
.../repository/StandardProcessSession.java | 318 +++--------
.../repository/VolatileContentRepository.java | 49 +-
.../repository/VolatileFlowFileRepository.java | 46 +-
.../WriteAheadFlowFileRepository.java | 101 +++-
.../repository/claim/StandardContentClaim.java | 133 ++---
.../claim/StandardContentClaimManager.java | 145 -----
.../repository/claim/StandardResourceClaim.java | 134 +++++
.../claim/StandardResourceClaimManager.java | 145 +++++
.../controller/TestFileSystemSwapManager.java | 24 +-
.../repository/TestFileSystemRepository.java | 192 ++++++-
.../repository/TestStandardProcessSession.java | 378 ++++---------
.../TestVolatileContentRepository.java | 17 +-
.../TestWriteAheadFlowFileRepository.java | 6 +-
.../src/test/resources/bye.txt | 1 +
26 files changed, 1646 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index ee3ead9..da87d75 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.Set;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* Defines the capabilities of a content repository. Append options are not
@@ -42,7 +42,7 @@ public interface ContentRepository {
* @param claimManager to handle claims
* @throws java.io.IOException if unable to init
*/
- void initialize(ContentClaimManager claimManager) throws IOException;
+ void initialize(ResourceClaimManager claimManager) throws IOException;
/**
* Shuts down the Content Repository, freeing any resources that may be
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 5e59e04..58fc6b3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* Implementations must be thread safe
@@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable {
* @param claimManager for handling claims
* @throws java.io.IOException if unable to initialize repository
*/
- void initialize(ContentClaimManager claimManager) throws IOException;
+ void initialize(ResourceClaimManager claimManager) throws IOException;
/**
* @return the maximum number of bytes that can be stored in the underlying
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 869e2b3..2e5be11 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
/**
@@ -38,7 +38,7 @@ public interface FlowFileSwapManager {
* @param reporter the EventReporter that can be used for notifying users of
* important events
*/
- void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
+ void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
/**
* Shuts down the manager
@@ -59,5 +59,5 @@ public interface FlowFileSwapManager {
* @param claimManager manager
* @return how many flowfiles have been recovered
*/
- long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager);
+ long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 53cc44f..6a2ef7f 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim;
/**
* <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow
- * files may reference the same content by both having the same content
- * claim.</p>
+ * A reference to a section of a {@link ResourceClaim}, which may or may not encompass
+ * the entire ResourceClaim. Multiple FlowFiles may reference the same content by both
+ * having the same content claim.
+ * </p>
*
* <p>
- * Must be thread safe</p>
- *
+ * Must be thread safe
+ * </p>
*/
public interface ContentClaim extends Comparable<ContentClaim> {
/**
- * @return the unique identifier for this claim
- */
- String getId();
-
- /**
- * @return the container identifier in which this claim is held
+ * @return the ResourceClaim that this ContentClaim references
*/
- String getContainer();
-
+ ResourceClaim getResourceClaim();
+
/**
- * @return the section within a given container the claim is held
+ * @return the offset into the ResourceClaim where the content for this
+ * claim begins
*/
- String getSection();
-
+ long getOffset();
+
/**
- * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
- * attempt to keep the content but will not sacrifice a great deal of
- * performance to do so
+ * @return the length of this ContentClaim
*/
- boolean isLossTolerant();
+ long getLength();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
deleted file mode 100644
index bffcec3..0000000
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Responsible for managing all ContentClaims that are used in the application
- */
-public interface ContentClaimManager {
-
- /**
- * Creates a new Content Claim with the given id, container, section, and
- * loss tolerance.
- *
- * @param id of claim
- * @param container of claim
- * @param section of claim
- * @param lossTolerant of claim
- * @return new claim
- */
- ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant);
-
- /**
- * @param claim to obtain reference count for
- * @return the number of FlowFiles that hold a claim to a particular piece
- * of FlowFile content
- */
- int getClaimantCount(ContentClaim claim);
-
- /**
- * Decreases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count
- *
- * @param claim to decrement claimants on
- * @return new claimaint count
- */
- int decrementClaimantCount(ContentClaim claim);
-
- /**
- * Increases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count
- *
- * @param claim to increment claims on
- * @return new claimant count
- */
- int incrementClaimantCount(ContentClaim claim);
-
- /**
- * Increases by 1 the count of how many FlowFiles hold a claim to a
- * particular piece of FlowFile content and returns the new count.
- *
- * If it is known that the Content Claim whose count is being incremented is
- * a newly created ContentClaim, this method should be called with a value
- * of {@code true} as the second argument, as it may allow the manager to
- * optimize its tasks, knowing that the Content Claim cannot be referenced
- * by any other component
- *
- * @param claim to increment
- * @param newClaim provides a hint that no other process can have access to this
- * claim right now
- * @return new claim count
- */
- int incrementClaimantCount(ContentClaim claim, boolean newClaim);
-
- /**
- * Indicates that the given ContentClaim can now be destroyed by the
- * appropriate Content Repository. This should be done only after it is
- * guaranteed that the FlowFile Repository has been synchronized with its
- * underlying storage component. This way, we avoid the following sequence
- * of events:
- * <ul>
- * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
- * depends on ContentClaim C</li>
- * <li>ContentClaim C is no longer needed and is destroyed</li>
- * <li>The Operating System crashes or there is a power failure</li>
- * <li>Upon restart, the FlowFile Repository was not synchronized with its
- * underlying storage mechanism and as such indicates that FlowFile F needs
- * ContentClaim C.</li>
- * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
- * and FlowFile F's Content is not found, so the FlowFile is removed,
- * resulting in data loss.</li>
- * </ul>
- *
- * <p>
- * Using this method of marking the ContentClaim as destructable only when
- * the FlowFile repository has been synced with the underlying storage
- * mechanism, we can ensure that on restart, we will not point to this
- * unneeded claim. As such, it is now safe to destroy the contents.
- * </p>
- *
- * @param claim to mark as now destructable
- */
- void markDestructable(ContentClaim claim);
-
- /**
- * Drains up to {@code maxElements} Content Claims from the internal queue
- * of destructable content claims to the given {@code destination} so that
- * they can be destroyed.
- *
- * @param destination to drain to
- * @param maxElements max items to drain
- */
- void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements);
-
- /**
- * Drains up to {@code maxElements} Content Claims from the internal queue
- * of destructable content claims to the given {@code destination} so that
- * they can be destroyed. If no ContentClaim is ready to be destroyed at
- * this time, will wait up to the specified amount of time before returning.
- * If, after the specified amount of time, there is still no ContentClaim
- * ready to be destroyed, the method will return without having added
- * anything to the given {@code destination}.
- *
- * @param destination to drain to
- * @param maxElements max items to drain
- * @param timeout maximum time to wait
- * @param unit unit of time to wait
- */
- void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit);
-
- /**
- * Clears the manager's memory of any and all ContentClaims that it knows
- * about
- */
- void purge();
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
new file mode 100644
index 0000000..d448632
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+/**
+ * <p>
+ * Represents a resource that can be provided by a {@link ContentRepository}
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE!
+ * </p>
+ */
+public interface ResourceClaim extends Comparable<ResourceClaim> {
+
+ /**
+ * @return the unique identifier for this claim
+ */
+ String getId();
+
+ /**
+ * @return the container identifier in which this claim is held
+ */
+ String getContainer();
+
+ /**
+ * @return the section within a given container the claim is held
+ */
+ String getSection();
+
+ /**
+ * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
+ * attempt to keep the content but will not sacrifice a great deal of
+ * performance to do so
+ */
+ boolean isLossTolerant();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
new file mode 100644
index 0000000..01f4c65
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ResourceClaims that are used in the application
+ */
+public interface ResourceClaimManager {
+
+ /**
+ * Creates a new Resource Claim with the given id, container, section, and
+ * loss tolerance.
+ *
+ * @param id of claim
+ * @param container of claim
+ * @param section of claim
+ * @param lossTolerant of claim
+ * @return new claim
+ */
+ ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+
+ /**
+ * @param claim to obtain reference count for
+ * @return the number of FlowFiles that hold a claim to a particular piece
+ * of FlowFile content
+ */
+ int getClaimantCount(ResourceClaim claim);
+
+ /**
+ * Decreases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count
+ *
+ * @param claim to decrement claimants on
+ * @return new claimaint count
+ */
+ int decrementClaimantCount(ResourceClaim claim);
+
+ /**
+ * Increases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count
+ *
+ * @param claim to increment claims on
+ * @return new claimant count
+ */
+ int incrementClaimantCount(ResourceClaim claim);
+
+ /**
+ * Increases by 1 the count of how many FlowFiles hold a claim to a
+ * particular piece of FlowFile content and returns the new count.
+ *
+ * If it is known that the Content Claim whose count is being incremented is
+ * a newly created ResourceClaim, this method should be called with a value
+ * of {@code true} as the second argument, as it may allow the manager to
+ * optimize its tasks, knowing that the Content Claim cannot be referenced
+ * by any other component
+ *
+ * @param claim to increment
+ * @param newClaim provides a hint that no other process can have access to this
+ * claim right now
+ * @return new claim count
+ */
+ int incrementClaimantCount(ResourceClaim claim, boolean newClaim);
+
+ /**
+ * Indicates that the given ResourceClaim can now be destroyed by the
+ * appropriate Content Repository. This should be done only after it is
+ * guaranteed that the FlowFile Repository has been synchronized with its
+ * underlying storage component. This way, we avoid the following sequence
+ * of events:
+ * <ul>
+ * <li>FlowFile Repository is updated to indicate that FlowFile F no longer depends on ResourceClaim C</li>
+ * <li>ResourceClaim C is no longer needed and is destroyed</li>
+ * <li>The Operating System crashes or there is a power failure</li>
+ * <li>Upon restart, the FlowFile Repository was not synchronized with its underlying storage mechanism and as such indicates that FlowFile F needs ResourceClaim C.</li>
+ * <li>Since ResourceClaim C has already been destroyed, it is inaccessible, and FlowFile F's Content is not found, so the FlowFile is removed, resulting in data loss.</li>
+ * </ul>
+ *
+ * <p>
+ * Using this method of marking the ResourceClaim as destructable only when the FlowFile repository has been synced with the underlying storage mechanism, we can ensure that on restart, we will
+ * not point to this unneeded claim. As such, it is now safe to destroy the contents.
+ * </p>
+ *
+ * @param claim to mark as now destructable
+ */
+ void markDestructable(ResourceClaim claim);
+
+ /**
+ * Drains up to {@code maxElements} Content Claims from the internal queue
+ * of destructable content claims to the given {@code destination} so that
+ * they can be destroyed.
+ *
+ * @param destination to drain to
+ * @param maxElements max items to drain
+ */
+ void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements);
+
+ /**
+ * Drains up to {@code maxElements} Content Claims from the internal queue
+ * of destructable content claims to the given {@code destination} so that
+ * they can be destroyed. If no ResourceClaim is ready to be destroyed at
+ * this time, will wait up to the specified amount of time before returning.
+ * If, after the specified amount of time, there is still no ResourceClaim
+ * ready to be destroyed, the method will return without having added
+ * anything to the given {@code destination}.
+ *
+ * @param destination to drain to
+ * @param maxElements max items to drain
+ * @param timeout maximum time to wait
+ * @param unit unit of time to wait
+ */
+ void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);
+
+ /**
+ * Clears the manager's memory of any and all ResourceClaims that it knows
+ * about
+ */
+ void purge();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 9bbd45e..4865e98 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream {
public void close() throws IOException {
out.close();
}
+
+ public OutputStream getWrappedStream() {
+ return out;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 604dba9..c829566 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,7 +61,9 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -83,7 +85,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
- public static final int SWAP_ENCODING_VERSION = 6;
+ public static final int SWAP_ENCODING_VERSION = 7;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private final ScheduledExecutorService swapQueueIdentifierExecutor;
@@ -98,7 +100,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private final long swapOutMillis;
private final int swapOutThreadCount;
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@@ -138,7 +140,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
@Override
- public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
+ public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) {
this.claimManager = claimManager;
this.flowFileRepository = flowFileRepository;
this.eventReporter = eventReporter;
@@ -184,11 +186,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- out.writeUTF(claim.getId());
- out.writeUTF(claim.getContainer());
- out.writeUTF(claim.getSection());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ out.writeUTF(resourceClaim.getId());
+ out.writeUTF(resourceClaim.getContainer());
+ out.writeUTF(resourceClaim.getSection());
+ out.writeLong(claim.getOffset());
+ out.writeLong(claim.getLength());
out.writeLong(flowFile.getContentClaimOffset());
- out.writeBoolean(claim.isLossTolerant());
+ out.writeBoolean(resourceClaim.isLossTolerant());
}
final Map<String, String> attributes = flowFile.getAttributes();
@@ -226,7 +231,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
- static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
+ static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
@@ -245,7 +250,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue,
- final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
+ final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < numFlowFiles; i++) {
// legacy encoding had an "action" because it used to be couple with FlowFile Repository code
@@ -292,6 +297,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final String container = in.readUTF();
final String section = in.readUTF();
+
+ final long resourceOffset;
+ final long resourceLength;
+ if (serializationVersion < 6) {
+ resourceOffset = 0L;
+ resourceLength = -1L;
+ } else {
+ resourceOffset = in.readLong();
+ resourceLength = in.readLong();
+ }
+
final long claimOffset = in.readLong();
final boolean lossTolerant;
@@ -301,10 +317,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lossTolerant = false;
}
- final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+ final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+ claim.setLength(resourceLength);
if (incrementContentClaims) {
- claimManager.incrementClaimantCount(claim);
+ claimManager.incrementClaimantCount(resourceClaim);
}
ffBuilder.contentClaim(claim);
@@ -353,16 +371,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
throw new EOFException();
}
if (firstValue == 0xff && secondValue == 0xff) {
- int ch1 = in.read();
- int ch2 = in.read();
- int ch3 = in.read();
- int ch4 = in.read();
+ final int ch1 = in.read();
+ final int ch2 = in.read();
+ final int ch3 = in.read();
+ final int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) {
throw new EOFException();
}
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
} else {
- return ((firstValue << 8) + (secondValue));
+ return (firstValue << 8) + secondValue;
}
}
@@ -422,7 +440,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final FlowFileQueue flowFileQueue = entry.getKey();
// if queue is more than 60% of its swap threshold, don't swap flowfiles in
- if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
+ if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) {
continue;
}
@@ -432,7 +450,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final Queue<File> queue = queueLockWrapper.getQueue();
// Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
- while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
+ while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) {
File swapFile = null;
try {
swapFile = queue.poll();
@@ -545,7 +563,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
if (swapQueue == null) {
swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
- QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
+ final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
if (oldQueue != null) {
swapQueue = oldQueue;
}
@@ -567,7 +585,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
* @return the largest FlowFile ID that was recovered
*/
@Override
- public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
+ public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
@@ -680,6 +698,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
+ @Override
public void shutdown() {
swapQueueIdentifierExecutor.shutdownNow();
swapInExecutor.shutdownNow();
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d78b3a..815f97e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -98,9 +98,12 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
@@ -282,7 +285,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final NodeProtocolSender protocolSender;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
- private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
+ private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
// guarded by rwLock
/**
@@ -495,7 +498,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
}
- private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+ private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
@@ -3108,7 +3111,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public boolean isInputAvailable() {
try {
- return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
+ return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+ event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
} catch (final IOException e) {
return false;
}
@@ -3117,43 +3121,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public boolean isOutputAvailable() {
try {
- return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
+ return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(),
+ event.getContentClaimIdentifier(), event.getContentClaimOffset()));
} catch (final IOException e) {
return false;
}
}
- private ContentClaim createClaim(final String container, final String section, final String identifier) {
+ private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) {
if (container == null || section == null || identifier == null) {
return null;
}
- return new ContentClaim() {
- @Override
- public int compareTo(final ContentClaim o) {
- return 0;
- }
-
- @Override
- public String getId() {
- return identifier;
- }
-
- @Override
- public String getContainer() {
- return container;
- }
-
- @Override
- public String getSection() {
- return section;
- }
-
- @Override
- public boolean isLossTolerant() {
- return false;
- }
- };
+ final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false);
+ return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
}
@Override
@@ -3170,45 +3151,48 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
requireNonNull(requestUri);
final ContentClaim claim;
- final Long offset;
final long size;
+ final long offset;
if (direction == ContentDirection.INPUT) {
if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Input Content Claim not specified");
}
- claim = contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false);
- offset = provEvent.getPreviousContentClaimOffset();
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+ provEvent.getPreviousContentClaimIdentifier(), false);
+ claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
+ offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
size = provEvent.getPreviousFileSize();
} else {
if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Output Content Claim not specified");
}
- claim = contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false);
- offset = provEvent.getContentClaimOffset();
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+ provEvent.getContentClaimIdentifier(), false);
+
+ claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
+ offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
size = provEvent.getFileSize();
}
final InputStream rawStream = contentRepository.read(claim);
- if (offset != null) {
- StreamUtils.skip(rawStream, offset.longValue());
- }
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
- .setEventType(ProvenanceEventType.SEND)
- .setFlowFileUUID(provEvent.getFlowFileUuid())
- .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap())
- .setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), offset, size)
- .setTransitUri(requestUri)
- .setEventTime(System.currentTimeMillis())
- .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
- .setLineageStartDate(provEvent.getLineageStartDate())
- .setComponentType(getName())
- .setComponentId(getRootGroupId())
- .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
- .build();
+ .setEventType(ProvenanceEventType.SEND)
+ .setFlowFileUUID(provEvent.getFlowFileUuid())
+ .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
+ .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
+ .setTransitUri(requestUri)
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
+ .setLineageStartDate(provEvent.getLineageStartDate())
+ .setComponentType(getName())
+ .setComponentId(getRootGroupId())
+ .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
+ .build();
provenanceEventRepository.registerEvent(sendEvent);
@@ -3233,7 +3217,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- if (!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer, contentClaimSection, contentClaimId, false))) {
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
+
+ if (!contentRepository.isAccessible(contentClaim)) {
return "Content is no longer available in Content Repository";
}
} catch (final IOException ioe) {
@@ -3310,18 +3297,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Create the ContentClaim
- final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+ final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim
- contentClaimManager.incrementClaimantCount(claim);
+ contentClaimManager.incrementClaimantCount(resourceClaim);
+ final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
+ final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
+ contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
- if (!contentRepository.isAccessible(claim)) {
- contentClaimManager.decrementClaimantCount(claim);
+ if (!contentRepository.isAccessible(contentClaim)) {
+ contentClaimManager.decrementClaimantCount(resourceClaim);
throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
}
- final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
final String parentUUID = event.getFlowFileUuid();
// Create the FlowFile Record
@@ -3331,39 +3320,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String newFlowFileUUID = UUID.randomUUID().toString();
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- // Copy relevant info from source FlowFile
- .addAttributes(event.getPreviousAttributes())
- .contentClaim(claim)
- .contentClaimOffset(claimOffset)
- .entryDate(System.currentTimeMillis())
- .id(flowFileRepository.getNextFlowFileSequence())
- .lineageIdentifiers(lineageIdentifiers)
- .lineageStartDate(event.getLineageStartDate())
- .size(contentSize.longValue())
- // Create a new UUID and add attributes indicating that this is a replay
- .addAttribute("flowfile.replay", "true")
- .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
- .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
- // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
- .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
- // build the record
- .build();
+ // Copy relevant info from source FlowFile
+ .addAttributes(event.getPreviousAttributes())
+ .contentClaim(contentClaim)
+ .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
+ .entryDate(System.currentTimeMillis())
+ .id(flowFileRepository.getNextFlowFileSequence())
+ .lineageIdentifiers(lineageIdentifiers)
+ .lineageStartDate(event.getLineageStartDate())
+ .size(contentSize.longValue())
+ // Create a new UUID and add attributes indicating that this is a replay
+ .addAttribute("flowfile.replay", "true")
+ .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
+ .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
+ // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
+ .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
+ // build the record
+ .build();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder()
- .setEventType(ProvenanceEventType.REPLAY)
- .addChildUuid(newFlowFileUUID)
- .addParentUuid(parentUUID)
- .setFlowFileUUID(parentUUID)
- .setAttributes(Collections.<String, String>emptyMap(), flowFileRecord.getAttributes())
- .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
- .setDetails("Replay requested by " + requestor)
- .setEventTime(System.currentTimeMillis())
- .setFlowFileEntryDate(System.currentTimeMillis())
- .setLineageStartDate(event.getLineageStartDate())
- .setComponentType(event.getComponentType())
- .setComponentId(event.getComponentId())
- .build();
+ .setEventType(ProvenanceEventType.REPLAY)
+ .addChildUuid(newFlowFileUUID)
+ .addParentUuid(parentUUID)
+ .setFlowFileUUID(parentUUID)
+ .setAttributes(Collections.<String, String> emptyMap(), flowFileRecord.getAttributes())
+ .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
+ .setDetails("Replay requested by " + requestor)
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(System.currentTimeMillis())
+ .setLineageStartDate(event.getLineageStartDate())
+ .setComponentType(event.getComponentType())
+ .setComponentId(event.getComponentId())
+ .build();
provenanceEventRepository.registerEvent(replayEvent);
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
[2/3] nifi git commit: NIFI-744: Refactored ContentClaim into
ContentClaim and ResourceClaim so that we can append to a single file in the
FileSystemRepository even after a session is completed
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..3a6338c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -23,14 +23,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
@@ -61,18 +58,20 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +91,11 @@ public class FileSystemRepository implements ContentRepository {
private final AtomicLong index;
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
- private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>();
+ private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+ private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
+ private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final boolean archiveData;
private final long maxArchiveMillis;
@@ -101,7 +103,7 @@ public class FileSystemRepository implements ContentRepository {
private final boolean alwaysSync;
private final ScheduledExecutorService containerCleanupExecutor;
- private ContentClaimManager contentClaimManager; // effectively final
+ private ResourceClaimManager resourceClaimManager; // effectively final
// Map of contianer to archived files that should be deleted next.
private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
@@ -113,7 +115,7 @@ public class FileSystemRepository implements ContentRepository {
final NiFiProperties properties = NiFiProperties.getInstance();
// determine the file repository paths and ensure they exist
final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
- for (Path path : fileRespositoryPaths.values()) {
+ for (final Path path : fileRespositoryPaths.values()) {
Files.createDirectories(path);
}
@@ -122,7 +124,7 @@ public class FileSystemRepository implements ContentRepository {
index = new AtomicLong(0L);
for (final String containerName : containerNames) {
- reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000));
+ reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000));
archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000));
}
@@ -196,8 +198,8 @@ public class FileSystemRepository implements ContentRepository {
}
@Override
- public void initialize(final ContentClaimManager claimManager) {
- this.contentClaimManager = claimManager;
+ public void initialize(final ResourceClaimManager claimManager) {
+ this.resourceClaimManager = claimManager;
final NiFiProperties properties = NiFiProperties.getInstance();
@@ -231,6 +233,13 @@ public class FileSystemRepository implements ContentRepository {
public void shutdown() {
executor.shutdown();
containerCleanupExecutor.shutdown();
+
+ for (final OutputStream out : writableClaimStreams.values()) {
+ try {
+ out.close();
+ } catch (final IOException ioe) {
+ }
+ }
}
private static double getRatio(final String value) {
@@ -397,8 +406,8 @@ public class FileSystemRepository implements ContentRepository {
final String id = idPath.toFile().getName();
final String sectionName = sectionPath.toFile().getName();
- final ContentClaim contentClaim = contentClaimManager.newContentClaim(containerName, sectionName, id, false);
- if (contentClaimManager.getClaimantCount(contentClaim) == 0) {
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+ if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove);
}
}
@@ -427,15 +436,21 @@ public class FileSystemRepository implements ContentRepository {
}
private Path getPath(final ContentClaim claim) {
- final Path containerPath = containers.get(claim.getContainer());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ return getPath(resourceClaim);
+ }
+
+ private Path getPath(final ResourceClaim resourceClaim) {
+ final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
return null;
}
- return containerPath.resolve(claim.getSection()).resolve(claim.getId());
+ return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
}
private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
- final Path containerPath = containers.get(claim.getContainer());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
if (verifyExists) {
throw new ContentNotFoundException(claim);
@@ -445,11 +460,11 @@ public class FileSystemRepository implements ContentRepository {
}
// Create the Path that points to the data
- Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
+ Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
// If the data does not exist, create a Path that points to where the data would exist in the archive directory.
if (!Files.exists(resolvedPath)) {
- resolvedPath = getArchivePath(claim);
+ resolvedPath = getArchivePath(claim.getResourceClaim());
}
if (verifyExists && !Files.exists(resolvedPath)) {
@@ -460,34 +475,55 @@ public class FileSystemRepository implements ContentRepository {
@Override
public ContentClaim create(final boolean lossTolerant) throws IOException {
- final long currentIndex = index.incrementAndGet();
-
- String containerName = null;
- boolean waitRequired = true;
- ContainerState containerState = null;
- for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
- final long modulatedContainerIndex = containerIndex % containers.size();
- containerName = containerNames.get((int) modulatedContainerIndex);
-
- containerState = containerStateMap.get(containerName);
- if (!containerState.isWaitRequired()) {
- waitRequired = false;
- break;
- }
- }
+ ResourceClaim resourceClaim;
+
+ // We need to synchronize on this queue because the act of pulling something off
+ // the queue and incrementing the associated claimant count MUST be done atomically.
+ // This way, if the claimant count is decremented to 0, we can ensure that the
+ // claim is not then pulled from the queue and used as another thread is destroying/archiving
+ // the claim.
+ final long resourceOffset;
+ synchronized (writableClaimQueue) {
+ final ClaimLengthPair pair = writableClaimQueue.poll();
+ if (pair == null) {
+ final long currentIndex = index.incrementAndGet();
+
+ String containerName = null;
+ boolean waitRequired = true;
+ ContainerState containerState = null;
+ for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
+ final long modulatedContainerIndex = containerIndex % containers.size();
+ containerName = containerNames.get((int) modulatedContainerIndex);
+
+ containerState = containerStateMap.get(containerName);
+ if (!containerState.isWaitRequired()) {
+ waitRequired = false;
+ break;
+ }
+ }
- if (waitRequired) {
- containerState.waitForArchiveExpiration();
- }
+ if (waitRequired) {
+ containerState.waitForArchiveExpiration();
+ }
- final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
- final String section = String.valueOf(modulatedSectionIndex);
- final String claimId = System.currentTimeMillis() + "-" + currentIndex;
+ final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
+ final String section = String.valueOf(modulatedSectionIndex);
+ final String claimId = System.currentTimeMillis() + "-" + currentIndex;
- final ContentClaim claim = contentClaimManager.newContentClaim(containerName, section, claimId, lossTolerant);
- contentClaimManager.incrementClaimantCount(claim, true);
+ resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
+ resourceOffset = 0L;
+ LOG.debug("Creating new Resource Claim {}", resourceClaim);
+ } else {
+ resourceClaim = pair.getClaim();
+ resourceOffset = pair.getLength();
+ LOG.debug("Reusing Resource Claim {}", resourceClaim);
+ }
+
+ resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+ }
- return claim;
+ final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
+ return scc;
}
@Override
@@ -496,7 +532,7 @@ public class FileSystemRepository implements ContentRepository {
return 0;
}
- return contentClaimManager.incrementClaimantCount(claim);
+ return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
}
@Override
@@ -504,7 +540,7 @@ public class FileSystemRepository implements ContentRepository {
if (claim == null) {
return 0;
}
- return contentClaimManager.getClaimantCount(claim);
+ return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
}
@Override
@@ -513,7 +549,7 @@ public class FileSystemRepository implements ContentRepository {
return 0;
}
- final int claimantCount = contentClaimManager.decrementClaimantCount(claim);
+ final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
return claimantCount;
}
@@ -523,9 +559,30 @@ public class FileSystemRepository implements ContentRepository {
return false;
}
+ return remove(claim.getResourceClaim());
+ }
+
+ private boolean remove(final ResourceClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ // we synchronize on the queue here because if the claimant count is 0,
+ // we need to be able to remove any instance of that resource claim from the
+ // queue atomically (i.e., the checking of the claimant count plus removal from the queue
+ // must be atomic)
+ synchronized (writableClaimQueue) {
+ final int claimantCount = resourceClaimManager.getClaimantCount(claim);
+ if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ // if other content claims are claiming the same resource, we have nothing to destroy,
+ // so just consider the destruction successful.
+ return true;
+ }
+ }
+
Path path = null;
try {
- path = getPath(claim, false);
+ path = getPath(claim);
} catch (final ContentNotFoundException cnfe) {
}
@@ -538,6 +595,7 @@ public class FileSystemRepository implements ContentRepository {
return true;
}
+
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
if (original == null) {
@@ -545,14 +603,11 @@ public class FileSystemRepository implements ContentRepository {
}
final ContentClaim newClaim = create(lossTolerant);
- final Path currPath = getPath(original, true);
- final Path newPath = getPath(newClaim);
- try (final FileOutputStream fos = new FileOutputStream(newPath.toFile())) {
- Files.copy(currPath, fos);
- if (alwaysSync) {
- fos.getFD().sync();
- }
+ try (final InputStream in = read(original);
+ final OutputStream out = write(newClaim)) {
+ StreamUtils.copy(in, out);
} catch (final IOException ioe) {
+ decrementClaimantCount(newClaim);
remove(newClaim);
throw ioe;
}
@@ -564,44 +619,28 @@ public class FileSystemRepository implements ContentRepository {
if (claims.contains(destination)) {
throw new IllegalArgumentException("destination cannot be within claims");
}
- try (final FileChannel dest = FileChannel.open(getPath(destination), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
- long position = 0L;
- if (header != null && header.length > 0) {
- final ByteBuffer buffer = ByteBuffer.wrap(header);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
- }
+
+ try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
+ if (header != null) {
+ out.write(header);
}
- int objectIndex = 0;
+
+ int i = 0;
for (final ContentClaim claim : claims) {
- long totalCopied = 0L;
- try (final FileChannel src = FileChannel.open(getPath(claim, true), StandardOpenOption.READ)) {
- while (totalCopied < src.size()) {
- final long copiedThisIteration = dest.transferFrom(src, position, Long.MAX_VALUE);
- totalCopied += copiedThisIteration;
- position += copiedThisIteration;
- }
+ try (final InputStream in = read(claim)) {
+ StreamUtils.copy(in, out);
}
- // don't add demarcator after the last claim
- if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) {
- final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
- }
- }
- }
- if (footer != null && footer.length > 0) {
- final ByteBuffer buffer = ByteBuffer.wrap(footer);
- while (buffer.hasRemaining()) {
- position += dest.write(buffer, position);
+
+ if (++i < claims.size() && demarcator != null) {
+ out.write(demarcator);
}
}
- if (alwaysSync) {
- dest.force(true);
+ if (footer != null) {
+ out.write(footer);
}
- return position;
+ return out.getBytesWritten();
}
}
@@ -624,12 +663,8 @@ public class FileSystemRepository implements ContentRepository {
@Override
public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException {
- try (final FileOutputStream out = new FileOutputStream(getPath(claim).toFile(), append)) {
- final long copied = StreamUtils.copy(content, out);
- if (alwaysSync) {
- out.getFD().sync();
- }
- return copied;
+ try (final OutputStream out = write(claim, append)) {
+ return StreamUtils.copy(content, out);
}
}
@@ -642,20 +677,14 @@ public class FileSystemRepository implements ContentRepository {
Files.createFile(destination);
return 0L;
}
- if (append) {
- try (final FileChannel sourceChannel = FileChannel.open(getPath(claim, true), StandardOpenOption.READ);
- final FileChannel destinationChannel = FileChannel.open(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
- long position = destinationChannel.size();
- final long targetSize = position + sourceChannel.size();
- while (position < targetSize) {
- final long bytesCopied = destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE);
- position += bytesCopied;
- }
- return position;
+
+ try (final InputStream in = read(claim);
+ final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+ final long copied = StreamUtils.copy(in, fos);
+ if (alwaysSync) {
+ fos.getFD().sync();
}
- } else {
- Files.copy(getPath(claim, true), destination, StandardCopyOption.REPLACE_EXISTING);
- return Files.size(destination);
+ return copied;
}
}
@@ -674,28 +703,20 @@ public class FileSystemRepository implements ContentRepository {
final long claimSize = size(claim);
if (offset > claimSize) {
- throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize);
+ throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize);
}
- if (append) {
- try (final InputStream sourceStream = Files.newInputStream(getPath(claim, true), StandardOpenOption.READ);
- final OutputStream destinationStream = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
- StreamUtils.skip(sourceStream, offset);
-
- final byte[] buffer = new byte[8192];
- int len;
- long copied = 0L;
- while ((len = sourceStream.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) {
- destinationStream.write(buffer, 0, len);
- copied += len;
- }
- return copied;
+ try (final InputStream in = read(claim);
+ final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
+ if (offset > 0) {
+ StreamUtils.skip(in, offset);
}
- } else {
- try (final OutputStream out = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
- return exportTo(claim, out, offset, length);
+ StreamUtils.copy(in, fos, length);
+ if (alwaysSync) {
+ fos.getFD().sync();
}
+ return length;
}
}
@@ -704,7 +725,10 @@ public class FileSystemRepository implements ContentRepository {
if (claim == null) {
return 0L;
}
- return Files.copy(getPath(claim, true), destination);
+
+ try (final InputStream in = read(claim)) {
+ return StreamUtils.copy(in, destination);
+ }
}
@Override
@@ -719,7 +743,7 @@ public class FileSystemRepository implements ContentRepository {
if (offset == 0 && length == claimSize) {
return exportTo(claim, destination);
}
- try (final InputStream in = Files.newInputStream(getPath(claim, true))) {
+ try (final InputStream in = read(claim)) {
StreamUtils.skip(in, offset);
final byte[] buffer = new byte[8192];
int len;
@@ -738,7 +762,12 @@ public class FileSystemRepository implements ContentRepository {
return 0L;
}
- return Files.size(getPath(claim, true));
+ // see javadocs for claim.getLength() as to why we do this.
+ if (claim.getLength() < 0) {
+ return Files.size(getPath(claim, true));
+ }
+
+ return claim.getLength();
}
@Override
@@ -747,16 +776,198 @@ public class FileSystemRepository implements ContentRepository {
return new ByteArrayInputStream(new byte[0]);
}
final Path path = getPath(claim, true);
- return new FileInputStream(path.toFile());
+ final FileInputStream fis = new FileInputStream(path.toFile());
+ if (claim.getOffset() > 0L) {
+ StreamUtils.skip(fis, claim.getOffset());
+ }
+
+ // see javadocs for claim.getLength() as to why we do this.
+ if (claim.getLength() >= 0) {
+ return new LimitedInputStream(fis, claim.getLength());
+ } else {
+ return fis;
+ }
}
@Override
- @SuppressWarnings("resource")
public OutputStream write(final ContentClaim claim) throws IOException {
- final FileOutputStream fos = new FileOutputStream(getPath(claim).toFile());
- return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos;
+ return write(claim, false);
}
+ private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
+ if (claim == null) {
+ throw new NullPointerException("ContentClaim cannot be null");
+ }
+
+ if (!(claim instanceof StandardContentClaim)) {
+ // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything
+ // else, just throw an Exception because it is not valid for this Repository
+ throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository");
+ }
+
+ final StandardContentClaim scc = (StandardContentClaim) claim;
+
+ // we always append because there may be another ContentClaim using the same resource claim.
+ // However, we know that we will never write to the same claim from two different threads
+ // at the same time because we will call create() to get the claim before we write to it,
+ // and when we call create(), it will remove it from the Queue, which means that no other
+ // thread will get the same Claim until we've finished writing to it.
+ ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
+ final long initialLength;
+ if (claimStream == null) {
+ final File file = getPath(scc).toFile();
+ claimStream = new ByteCountingOutputStream(new FileOutputStream(file, true), file.length());
+ initialLength = 0L;
+ } else {
+ if (append) {
+ initialLength = Math.max(0, scc.getLength());
+ } else {
+ initialLength = 0;
+ scc.setOffset(claimStream.getBytesWritten());
+ }
+ }
+
+ final ByteCountingOutputStream bcos = claimStream;
+ final OutputStream out = new OutputStream() {
+ private long bytesWritten = 0L;
+ private boolean recycle = true;
+ private boolean closed = false;
+
+ @Override
+ public String toString() {
+ return "FileSystemRepository Stream [" + scc + "]";
+ }
+
+ @Override
+ public synchronized void write(final int b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten++;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void write(final byte[] b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten += b.length;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ try {
+ bcos.write(b, off, len);
+ } catch (final IOException ioe) {
+ recycle = false;
+ throw new IOException("Failed to write to " + this, ioe);
+ }
+
+ bytesWritten += len;
+ scc.setLength(bytesWritten + initialLength);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+
+ bcos.flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+
+ if (alwaysSync) {
+ ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
+ }
+
+ if (scc.getLength() < 0) {
+ // If claim was not written to, set length to 0
+ scc.setLength(0L);
+ }
+
+ // if we've not yet hit the threshold for appending to a resource claim, add the claim
+ // to the writableClaimQueue so that the Resource Claim can be used again when create()
+ // is called. In this case, we don't have to actually close the file stream. Instead, we
+ // can just add it onto the queue and continue to use it for the next content claim.
+ final long resourceClaimLength = scc.getOffset() + scc.getLength();
+ if (recycle && resourceClaimLength < maxAppendClaimLength) {
+ // we do not have to synchronize on the writable claim queue here because we
+ // are only adding something to the queue. We must synchronize if we are
+ // using a ResourceClaim from the queue and incrementing the claimant count on that resource
+ // because those need to be done atomically, or if we are destroying a claim that is on
+ // the queue because we need to ensure that the latter operation does not cause problems
+ // with the former.
+ final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
+ final boolean enqueued;
+ if (writableClaimQueue.contains(pair)) {
+ // may already exist on the queue, if the content claim is written to multiple times.
+ enqueued = true;
+ } else {
+ enqueued = writableClaimQueue.offer(pair);
+ }
+
+ if (enqueued) {
+ writableClaimStreams.put(scc.getResourceClaim(), bcos);
+ LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+ } else {
+ writableClaimStreams.remove(scc.getResourceClaim());
+ bcos.close();
+
+ LOG.debug("Claim length less than max; Closing {}", this);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+ }
+ }
+ } else {
+ // we've reached the limit for this claim. Don't add it back to our queue.
+ // Instead, just remove it and move on.
+ writableClaimStreams.remove(scc.getResourceClaim());
+ // ensure that the claim is no longer on the queue
+ writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+ bcos.close();
+ LOG.debug("Claim lenth >= max; Closing {}", this);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
+ }
+ }
+ }
+ };
+
+ LOG.debug("Writing to {}", out);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out));
+ }
+
+ return out;
+ }
+
+
@Override
public void purge() {
// delete all content from repositories
@@ -788,7 +999,7 @@ public class FileSystemRepository implements ContentRepository {
}
}
- contentClaimManager.purge();
+ resourceClaimManager.purge();
}
private class BinDestructableClaims implements Runnable {
@@ -800,17 +1011,17 @@ public class FileSystemRepository implements ContentRepository {
// because the Container generally maps to a physical partition on the disk, so we want a few
// different threads hitting the different partitions but don't want multiple threads hitting
// the same partition.
- final List<ContentClaim> toDestroy = new ArrayList<>();
+ final List<ResourceClaim> toDestroy = new ArrayList<>();
while (true) {
toDestroy.clear();
- contentClaimManager.drainDestructableClaims(toDestroy, 10000);
+ resourceClaimManager.drainDestructableClaims(toDestroy, 10000);
if (toDestroy.isEmpty()) {
return;
}
- for (final ContentClaim claim : toDestroy) {
+ for (final ResourceClaim claim : toDestroy) {
final String container = claim.getContainer();
- final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
+ final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container);
try {
while (true) {
@@ -838,7 +1049,7 @@ public class FileSystemRepository implements ContentRepository {
return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
}
- private Path getArchivePath(final ContentClaim claim) {
+ private Path getArchivePath(final ResourceClaim claim) {
final String claimId = claim.getId();
final Path containerPath = containers.get(claim.getContainer());
final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
@@ -859,22 +1070,28 @@ public class FileSystemRepository implements ContentRepository {
return true;
}
- return Files.exists(getArchivePath(contentClaim));
+ return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
- private void archive(final ContentClaim contentClaim) throws IOException {
+ private void archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return;
}
- final int claimantCount = getClaimantCount(contentClaim);
- if (claimantCount > 0) {
- throw new IllegalStateException("Cannot archive ContentClaim " + contentClaim + " because it is currently in use");
+ synchronized (writableClaimQueue) {
+ final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
+ if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ return;
+ }
+ }
+
+ final Path curPath = getPath(claim);
+ if (curPath == null) {
+ return;
}
- final Path curPath = getPath(contentClaim, true);
archive(curPath);
- LOG.debug("Successfully moved {} to archive", contentClaim);
+ LOG.debug("Successfully moved {} to archive", claim);
}
private void archive(final Path curPath) throws IOException {
@@ -1107,8 +1324,8 @@ public class FileSystemRepository implements ContentRepository {
while (true) {
// look through each of the binned queues of Content Claims
int successCount = 0;
- final List<ContentClaim> toRemove = new ArrayList<>();
- for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) {
+ final List<ResourceClaim> toRemove = new ArrayList<>();
+ for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) {
// drain the queue of all ContentClaims that can be destroyed for the given container.
final String container = entry.getKey();
final ContainerState containerState = containerStateMap.get(container);
@@ -1121,7 +1338,7 @@ public class FileSystemRepository implements ContentRepository {
// destroy each claim for this container
final long start = System.nanoTime();
- for (final ContentClaim claim : toRemove) {
+ for (final ResourceClaim claim : toRemove) {
if (archiveData) {
try {
archive(claim);
@@ -1210,7 +1427,7 @@ public class FileSystemRepository implements ContentRepository {
@Override
public void run() {
try {
- if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
+ if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
return;
@@ -1245,7 +1462,7 @@ public class FileSystemRepository implements ContentRepository {
if (oldestContainerArchive < 0L) {
boolean updated;
do {
- long oldest = oldestArchiveDate.get();
+ final long oldest = oldestArchiveDate.get();
if (oldestContainerArchive < oldest) {
updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
@@ -1298,7 +1515,7 @@ public class FileSystemRepository implements ContentRepository {
final long free = getContainerUsableSpace(containerName);
used = capacity - free;
bytesUsed = used;
- } catch (IOException e) {
+ } catch (final IOException e) {
return false;
}
}
@@ -1317,7 +1534,7 @@ public class FileSystemRepository implements ContentRepository {
try {
LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
condition.await();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
}
} finally {
@@ -1355,4 +1572,55 @@ public class FileSystemRepository implements ContentRepository {
}
}
+
+ private static class ClaimLengthPair {
+ private final ResourceClaim claim;
+ private final Long length;
+
+ public ClaimLengthPair(final ResourceClaim claim, final Long length) {
+ this.claim = claim;
+ this.length = length;
+ }
+
+ public ResourceClaim getClaim() {
+ return claim;
+ }
+
+ public Long getLength() {
+ return length;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (claim == null ? 0 : claim.hashCode());
+ return result;
+ }
+
+ /**
+ * Equality is determined purely by the ResourceClaim's equality
+ *
+ * @param obj the object to compare against
+ * @return -1, 0, or +1 according to the contract of Object.equals
+ */
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final ClaimLengthPair other = (ClaimLengthPair) obj;
+ return claim.equals(other.getClaim());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 6524cd3..cc8c734 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
@Override
public boolean isPenalized() {
- return (penaltyExpirationMs > 0) ? penaltyExpirationMs > System.currentTimeMillis() : false;
+ return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
}
@Override
@@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
public String toString() {
final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
- builder.append("claim", claim == null ? "" : claim.getId());
+ builder.append("claim", claim == null ? "" : claim.toString());
builder.append("offset", claimOffset);
builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
return builder.toString();
@@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final Set<String> bLineageIdentifiers = new HashSet<>();
private long bPenaltyExpirationMs = -1L;
private long bSize = 0L;
- private Map<String, String> bAttributes = new HashMap<>();
+ private final Map<String, String> bAttributes = new HashMap<>();
private ContentClaim bClaim = null;
private long bClaimOffset = 0L;
private long bLastQueueDate = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 04e819e..2ce48fa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
@@ -54,10 +55,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.LongHolder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.QueueSize;
@@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,10 +90,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
- private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
- NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
- private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
-
public static final String DEFAULT_FLOWFILE_PATH = "./";
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
@@ -126,11 +120,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private long contentSizeIn = 0L, contentSizeOut = 0L;
private int writeRecursionLevel = 0;
- private ContentClaim currentWriteClaim = null;
- private OutputStream currentWriteClaimStream = null;
- private long currentWriteClaimSize = 0L;
- private int currentWriteClaimFlowFileCount = 0;
-
private ContentClaim currentReadClaim = null;
private ByteCountingInputStream currentReadClaimStream = null;
private long processingStartTime;
@@ -690,12 +679,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (originalClaim == null) {
builder.setCurrentContentClaim(null, null, null, null, 0L);
} else {
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
builder.setCurrentContentClaim(
- originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
- );
+ resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
builder.setPreviousContentClaim(
- originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
- );
+ resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize());
}
}
@@ -711,14 +701,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = flowFile.getSize();
- recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+ final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+ recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
}
if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
- recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+ recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
}
final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -741,14 +735,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = eventFlowFile.getSize();
- recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+ final ResourceClaim resourceClaim = currentClaim.getResourceClaim();
+ recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
}
if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
- recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+
+ final ResourceClaim resourceClaim = originalClaim.getResourceClaim();
+ recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize);
}
final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -1650,8 +1648,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim claim = record.getContentClaim();
if (claim != null) {
- enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
- enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ record.getContentClaimOffset() + claim.getOffset(), record.getSize());
+ enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ record.getContentClaimOffset() + claim.getOffset(), record.getSize());
}
enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
@@ -1695,7 +1696,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}
- return new NonCloseableInputStream(currentReadClaimStream);
+ return new DisableOnCloseInputStream(currentReadClaimStream);
}
}
@@ -1711,7 +1712,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
// reuse the same InputStream for the next FlowFile
- return new NonCloseableInputStream(currentReadClaimStream);
+ return new DisableOnCloseInputStream(currentReadClaimStream);
} else {
final InputStream rawInStream = context.getContentRepository().read(claim);
StreamUtils.skip(rawInStream, offset);
@@ -1862,30 +1863,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
- private void enforceCurrentWriteClaimState() {
- if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) {
- resetWriteClaims();
- }
-
- if (currentWriteClaimStream == null) {
- try {
- currentWriteClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} to enforce Current Write Claim State for {}", currentWriteClaim, context.getConnectable());
- } catch (final IOException e) {
- throw new FlowFileHandlingException("Unable to create ContentClaim due to " + e.toString(), e);
- }
-
- try {
- currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
- } catch (final IOException e) {
- resetWriteClaims();
- throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
- }
- } else {
- context.getContentRepository().incrementClaimaintCount(currentWriteClaim);
- }
- }
-
private void ensureNotAppending(final ContentClaim claim) throws IOException {
if (claim == null) {
return;
@@ -1905,56 +1882,31 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long newSize = 0L;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
ContentClaim newClaim = null;
final LongHolder writtenHolder = new LongHolder(0L);
final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- claimOffset = currentWriteClaimSize;
- newClaim = currentWriteClaim;
- ensureNotAppending(newClaim);
-
- try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-
- recursionSet.add(source);
-
- writeRecursionLevel++;
- try {
- writer.process(new FlowFileAccessOutputStream(countingOut, source));
- } finally {
- writeRecursionLevel--;
- }
- } finally {
- recursionSet.remove(source);
- }
-
- final long writtenThisCall = writtenHolder.getValue();
- newSize = writtenThisCall;
- currentWriteClaimSize += newSize;
- } else {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
- ensureNotAppending(newClaim);
- try (final OutputStream stream = context.getContentRepository().write(newClaim);
- final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
- recursionSet.add(source);
+ ensureNotAppending(newClaim);
+ try (final OutputStream stream = context.getContentRepository().write(newClaim);
+ final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+ final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+ recursionSet.add(source);
- writeRecursionLevel++;
- try {
- writer.process(new FlowFileAccessOutputStream(countingOut, source));
- } finally {
- writeRecursionLevel--;
- }
+ writeRecursionLevel++;
+ try {
+ writer.process(new FlowFileAccessOutputStream(countingOut, source));
} finally {
- recursionSet.remove(source);
+ writeRecursionLevel--;
}
- newSize = context.getContentRepository().size(newClaim);
+ } finally {
+ recursionSet.remove(source);
}
+ newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
if (appendToClaim) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
@@ -2115,18 +2067,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void resetWriteClaims() {
- try {
- if (currentWriteClaimStream != null) {
- currentWriteClaimStream.flush();
- currentWriteClaimStream.close();
- }
- } catch (final Exception e) {
- }
- currentWriteClaimStream = null;
- currentWriteClaim = null;
- currentWriteClaimFlowFileCount = 0;
- currentWriteClaimSize = 0L;
-
for (final ByteCountingOutputStream out : appendableStreams.values()) {
try {
out.flush();
@@ -2166,116 +2106,60 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ContentClaim newClaim = null;
long newSize = 0L;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
final LongHolder writtenHolder = new LongHolder(0L);
- final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- claimOffset = currentWriteClaimSize;
- newClaim = currentWriteClaim;
- ensureNotAppending(newClaim);
-
- try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
- final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
- final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
- final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
-
- recursionSet.add(source);
-
- // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
- // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
- // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
- // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
- // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
- final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
- boolean cnfeThrown = false;
-
- writeRecursionLevel++;
- try {
- writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
- } catch (final ContentNotFoundException cnfe) {
- cnfeThrown = true;
- throw cnfe;
- } finally {
- writeRecursionLevel--;
- recursionSet.remove(source);
-
- // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
- if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
- throw ffais.getContentNotFoundException();
- }
- }
- }
-
- final long writtenThisCall = writtenHolder.getValue();
- newSize = writtenThisCall;
- currentWriteClaimSize += writtenThisCall;
- } else {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
- ensureNotAppending(newClaim);
+ ensureNotAppending(newClaim);
- try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
- final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
- final OutputStream os = context.getContentRepository().write(newClaim);
- final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
+ try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
+ final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
+ final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+ final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
+ final OutputStream os = context.getContentRepository().write(newClaim);
+ final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+ final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
- recursionSet.add(source);
+ recursionSet.add(source);
- // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
- // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
- // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
- // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
- // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
- final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
- boolean cnfeThrown = false;
+ // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
+ // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
+ // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
+ // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
+ // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
+ final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim);
+ boolean cnfeThrown = false;
- writeRecursionLevel++;
- try {
- writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
- } catch (final ContentNotFoundException cnfe) {
- cnfeThrown = true;
- throw cnfe;
- } finally {
- writeRecursionLevel--;
- recursionSet.remove(source);
+ writeRecursionLevel++;
+ try {
+ writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source));
+ } catch (final ContentNotFoundException cnfe) {
+ cnfeThrown = true;
+ throw cnfe;
+ } finally {
+ writeRecursionLevel--;
+ recursionSet.remove(source);
- // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
- if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
- throw ffais.getContentNotFoundException();
- }
+ // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
+ if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
+ throw ffais.getContentNotFoundException();
}
}
-
- newSize = context.getContentRepository().size(newClaim);
}
+
+ newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
handleContentNotFound(nfe, record);
} catch (final IOException ioe) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final FlowFileAccessException ffae) {
- if (appendToClaim) {
- resetWriteClaims();
- }
destroyContent(newClaim);
throw ffae;
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims(); // need to reset write claim before we can remove the claim
- }
destroyContent(newClaim);
throw t;
} finally {
@@ -2302,33 +2186,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim newClaim;
final long claimOffset;
- final boolean appendToClaim = isMergeContent();
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- newClaim = currentWriteClaim;
- claimOffset = currentWriteClaimSize;
- } else {
- try {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
- } catch (final IOException e) {
- throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
- }
-
- claimOffset = 0L;
+ try {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
+ claimOffset = 0L;
long newSize = 0L;
try {
final boolean append = isMergeContent();
newSize = context.getContentRepository().importFrom(source, newClaim, append);
bytesWritten.increment(newSize);
bytesRead.increment(newSize);
- currentWriteClaimSize += newSize;
} catch (final Throwable t) {
- if (appendToClaim) {
- resetWriteClaims();
- }
destroyContent(newClaim);
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
}
@@ -2351,31 +2223,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
ContentClaim newClaim = null;
- long claimOffset = 0L;
+ final long claimOffset = 0L;
final long newSize;
final boolean appendToClaim = isMergeContent();
try {
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- newClaim = currentWriteClaim;
- claimOffset = currentWriteClaimSize;
-
- final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
- bytesWritten.increment(bytesCopied);
- currentWriteClaimSize += bytesCopied;
- newSize = bytesCopied;
- } else {
- try {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
+ try {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
- newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
- bytesWritten.increment(newSize);
- currentWriteClaimSize += newSize;
- } catch (final IOException e) {
- throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
- }
+ newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
+ bytesWritten.increment(newSize);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
} catch (final Throwable t) {
if (appendToClaim) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..6c1626c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
import org.apache.nifi.controller.repository.io.MemoryManager;
@@ -92,7 +95,7 @@ public class VolatileContentRepository implements ContentRepository {
private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
public VolatileContentRepository() {
this(NiFiProperties.getInstance());
@@ -119,7 +122,7 @@ public class VolatileContentRepository implements ContentRepository {
}
@Override
- public void initialize(final ContentClaimManager claimManager) {
+ public void initialize(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
for (int i = 0; i < 3; i++) {
@@ -199,9 +202,10 @@ public class VolatileContentRepository implements ContentRepository {
private ContentClaim createLossTolerant() {
final long id = idGenerator.getAndIncrement();
- final ContentClaim claim = claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true);
+ final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
- claimManager.incrementClaimantCount(claim, true);
+ claimManager.incrementClaimantCount(resourceClaim, true);
claimMap.put(claim, contentBlock);
@@ -216,7 +220,7 @@ public class VolatileContentRepository implements ContentRepository {
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.incrementClaimantCount(resolveClaim(claim));
+ return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().incrementClaimaintCount(backupClaim);
}
@@ -230,7 +234,7 @@ public class VolatileContentRepository implements ContentRepository {
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.decrementClaimantCount(resolveClaim(claim));
+ return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().decrementClaimantCount(backupClaim);
}
@@ -244,7 +248,7 @@ public class VolatileContentRepository implements ContentRepository {
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
- return claimManager.getClaimantCount(resolveClaim(claim));
+ return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().getClaimantCount(backupClaim);
}
@@ -273,6 +277,29 @@ public class VolatileContentRepository implements ContentRepository {
return true;
}
+ private boolean remove(final ResourceClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ final Set<ContentClaim> contentClaims = new HashSet<>();
+ for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) {
+ final ContentClaim contentClaim = entry.getKey();
+ if (contentClaim.getResourceClaim().equals(claim)) {
+ contentClaims.add(contentClaim);
+ }
+ }
+
+ boolean removed = false;
+ for (final ContentClaim contentClaim : contentClaims) {
+ if (remove(contentClaim)) {
+ removed = true;
+ }
+ }
+
+ return removed;
+ }
+
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
final ContentClaim createdClaim = create(lossTolerant);
@@ -435,7 +462,7 @@ public class VolatileContentRepository implements ContentRepository {
@Override
public void purge() {
for (final ContentClaim claim : claimMap.keySet()) {
- claimManager.decrementClaimantCount(resolveClaim(claim));
+ claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
final ContentClaim backup = getBackupClaim(claim);
if (backup != null) {
getBackupRepository().remove(backup);
@@ -624,7 +651,7 @@ public class VolatileContentRepository implements ContentRepository {
@Override
public void run() {
- final List<ContentClaim> destructable = new ArrayList<>(1000);
+ final List<ResourceClaim> destructable = new ArrayList<>(1000);
while (true) {
destructable.clear();
claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS);
@@ -632,7 +659,7 @@ public class VolatileContentRepository implements ContentRepository {
return;
}
- for (final ContentClaim claim : destructable) {
+ for (final ResourceClaim claim : destructable) {
remove(claim);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50379fcc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index fe34fe0..a85b23b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
* <p>
@@ -32,10 +34,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
public class VolatileFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);
- private ContentClaimManager claimManager; // effectively final
+ private ResourceClaimManager claimManager; // effectively final
@Override
- public void initialize(final ContentClaimManager claimManager) {
+ public void initialize(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
}
@@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
public void close() throws IOException {
}
+ private void markDestructable(final ContentClaim contentClaim) {
+ if (contentClaim == null) {
+ return;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ return;
+ }
+
+ claimManager.markDestructable(resourceClaim);
+ }
+
+ private int getClaimantCount(final ContentClaim claim) {
+ if (claim == null) {
+ return 0;
+ }
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ if (resourceClaim == null) {
+ return 0;
+ }
+
+ return claimManager.getClaimantCount(resourceClaim);
+ }
+
@Override
public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
- if (record.getCurrentClaim() != null && claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
- claimManager.markDestructable(record.getCurrentClaim());
+ if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
+ markDestructable(record.getCurrentClaim());
}
// If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable.
- if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
- claimManager.markDestructable(record.getOriginalClaim());
+ if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) {
+ markDestructable(record.getOriginalClaim());
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed, mark original as destructable
- if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
- claimManager.markDestructable(record.getOriginalClaim());
+ if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) {
+ markDestructable(record.getOriginalClaim());
}
}
}