You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/31 19:48:56 UTC
nifi git commit: NIFI-744: Do not allow StandardContentClaim's offset
to be updated. Data should be read-only once it has been written
Repository: nifi
Updated Branches:
refs/heads/NIFI-744 b8cee5105 -> 53a6e962d
NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only once it has been written
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/53a6e962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/53a6e962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/53a6e962
Branch: refs/heads/NIFI-744
Commit: 53a6e962d6c8a29e80c61e9f75af20cae9abe25d
Parents: b8cee51
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 31 13:48:32 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 31 13:48:32 2015 -0400
----------------------------------------------------------------------
.../repository/ContentRepository.java | 10 ++-
.../repository/FileSystemRepository.java | 13 ++--
.../repository/StandardProcessSession.java | 27 ++++---
.../repository/claim/StandardContentClaim.java | 18 ++---
.../repository/TestFileSystemRepository.java | 76 --------------------
5 files changed, 37 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index da87d75..8d0bdb3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -173,9 +173,13 @@ public interface ContentRepository {
* @param content to import from
* @param claim the claim to write imported content to
* @param append if true, the content will be appended to the claim; if
- * false, the content will replace the contents of the claim
+ * false, the content will replace the contents of the claim
* @throws IOException if unable to read content
+ *
+ * @deprecated if needing to append to a content claim, the contents of the claim should be
+ * copied to a new claim and then the data to append should be written to that new claim.
*/
+ @Deprecated
long importFrom(Path content, ContentClaim claim, boolean append) throws IOException;
/**
@@ -198,7 +202,11 @@ public interface ContentRepository {
* @param append whether to append or replace
* @return length of data imported in bytes
* @throws IOException if failure to read or write stream
+ *
+ * @deprecated if needing to append to a content claim, the contents of the claim should be
+ * copied to a new claim and then the data to append should be written to that new claim.
*/
+ @Deprecated
long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 3a6338c..ced198c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -764,7 +764,7 @@ public class FileSystemRepository implements ContentRepository {
// see javadocs for claim.getLength() as to why we do this.
if (claim.getLength() < 0) {
- return Files.size(getPath(claim, true));
+ return Files.size(getPath(claim, true)) - claim.getOffset();
}
return claim.getLength();
@@ -806,6 +806,9 @@ public class FileSystemRepository implements ContentRepository {
}
final StandardContentClaim scc = (StandardContentClaim) claim;
+ if (claim.getLength() > 0) {
+ throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
+ }
// we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads
@@ -823,7 +826,6 @@ public class FileSystemRepository implements ContentRepository {
initialLength = Math.max(0, scc.getLength());
} else {
initialLength = 0;
- scc.setOffset(claimStream.getBytesWritten());
}
}
@@ -936,10 +938,11 @@ public class FileSystemRepository implements ContentRepository {
writableClaimStreams.put(scc.getResourceClaim(), bcos);
LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
} else {
- writableClaimStreams.remove(scc.getResourceClaim());
bcos.close();
- LOG.debug("Claim length less than max; Closing {}", this);
+ LOG.debug(
+ "Claim length less than max; Closing {} because could not add back to queue",
+ this);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
}
@@ -947,7 +950,7 @@ public class FileSystemRepository implements ContentRepository {
} else {
// we've reached the limit for this claim. Don't add it back to our queue.
// Instead, just remove it and move on.
- writableClaimStreams.remove(scc.getResourceClaim());
+
// ensure that the claim is no longer on the queue
writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
bcos.close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3e2868e..62a001c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1880,8 +1880,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
- long newSize = 0L;
- final long claimOffset = 0L;
ContentClaim newClaim = null;
final LongHolder writtenHolder = new LongHolder(0L);
@@ -1898,7 +1896,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} finally {
recursionSet.remove(source);
}
- newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
@@ -1920,7 +1917,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
removeTemporaryClaim(record);
- final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+ .fromFlowFile(record.getCurrent())
+ .contentClaim(newClaim)
+ .contentClaimOffset(0)
+ .size(writtenHolder.getValue())
+ .build();
+
record.setWorking(newFile);
return newFile;
}
@@ -2068,8 +2071,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currClaim = record.getCurrentClaim();
ContentClaim newClaim = null;
- long newSize = 0L;
- final long claimOffset = 0L;
final LongHolder writtenHolder = new LongHolder(0L);
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
@@ -2109,8 +2110,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
}
-
- newSize = context.getContentRepository().size(newClaim);
} catch (final ContentNotFoundException nfe) {
destroyContent(newClaim);
handleContentNotFound(nfe, record);
@@ -2128,7 +2127,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
removeTemporaryClaim(record);
- final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+ .fromFlowFile(record.getCurrent())
+ .contentClaim(newClaim)
+ .contentClaimOffset(0L)
+ .size(writtenHolder.getValue())
+ .build();
+
record.setWorking(newFile);
return newFile;
}
@@ -2157,7 +2162,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
claimOffset = 0L;
long newSize = 0L;
try {
- newSize = context.getContentRepository().importFrom(source, newClaim, false);
+ newSize = context.getContentRepository().importFrom(source, newClaim);
bytesWritten.increment(newSize);
bytesRead.increment(newSize);
} catch (final Throwable t) {
@@ -2191,7 +2196,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
- newSize = context.getContentRepository().importFrom(source, newClaim, false);
+ newSize = context.getContentRepository().importFrom(source, newClaim);
bytesWritten.increment(newSize);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 753e818..62ff276 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -28,29 +28,24 @@ package org.apache.nifi.controller.repository.claim;
public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
private final ResourceClaim resourceClaim;
- private final int hashCode;
- private volatile long offset;
+ private final long offset;
private volatile long length;
public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
this.resourceClaim = resourceClaim;
this.offset = offset;
this.length = -1L;
- this.hashCode = calculateHashCode();
}
public void setLength(final long length) {
this.length = length;
}
- public void setOffset(final long offset) {
- this.offset = offset;
- }
-
- private int calculateHashCode() {
+ @Override
+ public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + hashCode;
+ result = prime * result;
result = prime * result + (int) (length ^ length >>> 32);
result = prime * result + (int) (offset ^ offset >>> 32);
result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
@@ -58,11 +53,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
}
@Override
- public int hashCode() {
- return this.hashCode;
- }
-
- @Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 519ba9c..5ffcb3d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -36,10 +36,8 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Random;
import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.stream.io.StreamUtils;
@@ -136,54 +134,6 @@ public class TestFileSystemRepository {
assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
}
- @Test
- public void testRewriteContentClaim() throws IOException {
- final ContentClaim claim1 = repository.create(false);
- assertEquals(1, repository.getClaimantCount(claim1));
-
- try (final OutputStream out = repository.write(claim1)) {
- out.write("abc".getBytes());
- }
- assertEquals(1, repository.getClaimantCount(claim1));
-
- try (final OutputStream out = repository.write(claim1)) {
- out.write("cba".getBytes());
- }
- assertEquals(1, repository.getClaimantCount(claim1));
-
- try (final InputStream in = repository.read(claim1)) {
- assertEquals('c', in.read());
- assertEquals('b', in.read());
- assertEquals('a', in.read());
- }
- assertEquals(1, repository.getClaimantCount(claim1));
-
- assertEquals(3, repository.size(claim1));
-
- final byte[] oneMB = new byte[1024 * 1024 - 6];
- new Random().nextBytes(oneMB);
- try (final OutputStream out = repository.write(claim1)) {
- out.write(oneMB);
- }
- assertEquals(1, repository.getClaimantCount(claim1));
-
- assertEquals(1024 * 1024 - 6, repository.size(claim1));
- try (final InputStream in = repository.read(claim1)) {
- final byte[] buff = new byte[oneMB.length];
- StreamUtils.fillBuffer(in, buff);
- assertTrue(Arrays.equals(buff, oneMB));
- }
-
- final ResourceClaim resourceClaim = claim1.getResourceClaim();
- final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
- assertTrue(Files.exists(path));
- assertEquals(0, repository.decrementClaimantCount(claim1));
- assertTrue(repository.remove(claim1));
- assertFalse(Files.exists(path));
-
- final ContentClaim claim2 = repository.create(false);
- assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
- }
@Test
public void testWriteWithNoContent() throws IOException {
@@ -292,32 +242,6 @@ public class TestFileSystemRepository {
assertTrue(Arrays.equals(expected, baos.toByteArray()));
}
- @Test
- public void testImportFromFileWithAppend() throws IOException {
- final ContentClaim claim = repository.create(false);
- final File hello = new File("src/test/resources/hello.txt");
- final File goodbye = new File("src/test/resources/bye.txt");
-
- repository.importFrom(hello.toPath(), claim, true);
- assertContentEquals(claim, "Hello, World");
-
- repository.importFrom(goodbye.toPath(), claim, true);
- assertContentEquals(claim, "Hello, WorldGood-Bye, World!");
-
- repository.importFrom(hello.toPath(), claim, true);
- assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World");
-
- repository.importFrom(goodbye.toPath(), claim, false);
- assertContentEquals(claim, "Good-Bye, World!");
- }
-
- private void assertContentEquals(final ContentClaim claim, final String expected) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final InputStream in = repository.read(claim)) {
- StreamUtils.copy(in, baos);
- }
- assertEquals(expected, new String(baos.toByteArray()));
- }
@Test
public void testImportFromStream() throws IOException {