You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/31 19:48:56 UTC

nifi git commit: NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only once it has been written

Repository: nifi
Updated Branches:
  refs/heads/NIFI-744 b8cee5105 -> 53a6e962d


NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only once it has been written


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/53a6e962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/53a6e962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/53a6e962

Branch: refs/heads/NIFI-744
Commit: 53a6e962d6c8a29e80c61e9f75af20cae9abe25d
Parents: b8cee51
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 31 13:48:32 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 31 13:48:32 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           | 10 ++-
 .../repository/FileSystemRepository.java        | 13 ++--
 .../repository/StandardProcessSession.java      | 27 ++++---
 .../repository/claim/StandardContentClaim.java  | 18 ++---
 .../repository/TestFileSystemRepository.java    | 76 --------------------
 5 files changed, 37 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index da87d75..8d0bdb3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -173,9 +173,13 @@ public interface ContentRepository {
      * @param content to import from
      * @param claim the claim to write imported content to
      * @param append if true, the content will be appended to the claim; if
-     * false, the content will replace the contents of the claim
+     *        false, the content will replace the contents of the claim
      * @throws IOException if unable to read content
+     *
+     * @deprecated if needing to append to a content claim, the contents of the claim should be
+     *             copied to a new claim and then the data to append should be written to that new claim.
      */
+    @Deprecated
     long importFrom(Path content, ContentClaim claim, boolean append) throws IOException;
 
     /**
@@ -198,7 +202,11 @@ public interface ContentRepository {
      * @param append whether to append or replace
      * @return length of data imported in bytes
      * @throws IOException if failure to read or write stream
+     *
+     * @deprecated if needing to append to a content claim, the contents of the claim should be
+     * copied to a new claim and then the data to append should be written to that new claim.
      */
+    @Deprecated
     long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 3a6338c..ced198c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -764,7 +764,7 @@ public class FileSystemRepository implements ContentRepository {
 
         // see javadocs for claim.getLength() as to why we do this.
         if (claim.getLength() < 0) {
-            return Files.size(getPath(claim, true));
+			return Files.size(getPath(claim, true)) - claim.getOffset();
         }
 
         return claim.getLength();
@@ -806,6 +806,9 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         final StandardContentClaim scc = (StandardContentClaim) claim;
+        if (claim.getLength() > 0) {
+            throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
+        }
 
         // we always append because there may be another ContentClaim using the same resource claim.
         // However, we know that we will never write to the same claim from two different threads
@@ -823,7 +826,6 @@ public class FileSystemRepository implements ContentRepository {
                 initialLength = Math.max(0, scc.getLength());
             } else {
                 initialLength = 0;
-                scc.setOffset(claimStream.getBytesWritten());
             }
         }
 
@@ -936,10 +938,11 @@ public class FileSystemRepository implements ContentRepository {
                         writableClaimStreams.put(scc.getResourceClaim(), bcos);
                         LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
                     } else {
-                        writableClaimStreams.remove(scc.getResourceClaim());
                         bcos.close();
 
-                        LOG.debug("Claim length less than max; Closing {}", this);
+						LOG.debug(
+								"Claim length less than max; Closing {} because could not add back to queue",
+								this);
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
                         }
@@ -947,7 +950,7 @@ public class FileSystemRepository implements ContentRepository {
                 } else {
                     // we've reached the limit for this claim. Don't add it back to our queue.
                     // Instead, just remove it and move on.
-                    writableClaimStreams.remove(scc.getResourceClaim());
+
                     // ensure that the claim is no longer on the queue
                     writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
                     bcos.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3e2868e..62a001c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1880,8 +1880,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
-        long newSize = 0L;
-        final long claimOffset = 0L;
 
         ContentClaim newClaim = null;
         final LongHolder writtenHolder = new LongHolder(0L);
@@ -1898,7 +1896,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             } finally {
                 recursionSet.remove(source);
             }
-            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
@@ -1920,7 +1917,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+        	.fromFlowFile(record.getCurrent())
+        	.contentClaim(newClaim)
+        	.contentClaimOffset(0)
+        	.size(writtenHolder.getValue())
+        	.build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2068,8 +2071,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final ContentClaim currClaim = record.getCurrentClaim();
 
         ContentClaim newClaim = null;
-        long newSize = 0L;
-        final long claimOffset = 0L;
         final LongHolder writtenHolder = new LongHolder(0L);
         try {
             newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
@@ -2109,8 +2110,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     }
                 }
             }
-
-            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
@@ -2128,7 +2127,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+        	.fromFlowFile(record.getCurrent())
+        	.contentClaim(newClaim)
+        	.contentClaimOffset(0L)
+        	.size(writtenHolder.getValue())
+        	.build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2157,7 +2162,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         claimOffset = 0L;
         long newSize = 0L;
         try {
-            newSize = context.getContentRepository().importFrom(source, newClaim, false);
+            newSize = context.getContentRepository().importFrom(source, newClaim);
             bytesWritten.increment(newSize);
             bytesRead.increment(newSize);
         } catch (final Throwable t) {
@@ -2191,7 +2196,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
                 claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
 
-                newSize = context.getContentRepository().importFrom(source, newClaim, false);
+                newSize = context.getContentRepository().importFrom(source, newClaim);
                 bytesWritten.increment(newSize);
             } catch (final IOException e) {
                 throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 753e818..62ff276 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -28,29 +28,24 @@ package org.apache.nifi.controller.repository.claim;
 public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
 
     private final ResourceClaim resourceClaim;
-    private final int hashCode;
-    private volatile long offset;
+    private final long offset;
     private volatile long length;
 
     public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
         this.resourceClaim = resourceClaim;
         this.offset = offset;
         this.length = -1L;
-        this.hashCode = calculateHashCode();
     }
 
     public void setLength(final long length) {
         this.length = length;
     }
 
-    public void setOffset(final long offset) {
-        this.offset = offset;
-    }
-
-    private int calculateHashCode() {
+	@Override
+	public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + hashCode;
+		result = prime * result;
         result = prime * result + (int) (length ^ length >>> 32);
         result = prime * result + (int) (offset ^ offset >>> 32);
         result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
@@ -58,11 +53,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
     }
 
     @Override
-    public int hashCode() {
-        return this.hashCode;
-    }
-
-    @Override
     public boolean equals(final Object obj) {
         if (this == obj) {
             return true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 519ba9c..5ffcb3d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -36,10 +36,8 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -136,54 +134,6 @@ public class TestFileSystemRepository {
         assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
     }
 
-    @Test
-    public void testRewriteContentClaim() throws IOException {
-        final ContentClaim claim1 = repository.create(false);
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write("abc".getBytes());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write("cba".getBytes());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final InputStream in = repository.read(claim1)) {
-            assertEquals('c', in.read());
-            assertEquals('b', in.read());
-            assertEquals('a', in.read());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        assertEquals(3, repository.size(claim1));
-
-        final byte[] oneMB = new byte[1024 * 1024 - 6];
-        new Random().nextBytes(oneMB);
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write(oneMB);
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        assertEquals(1024 * 1024 - 6, repository.size(claim1));
-        try (final InputStream in = repository.read(claim1)) {
-            final byte[] buff = new byte[oneMB.length];
-            StreamUtils.fillBuffer(in, buff);
-            assertTrue(Arrays.equals(buff, oneMB));
-        }
-
-        final ResourceClaim resourceClaim = claim1.getResourceClaim();
-        final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
-        assertTrue(Files.exists(path));
-        assertEquals(0, repository.decrementClaimantCount(claim1));
-        assertTrue(repository.remove(claim1));
-        assertFalse(Files.exists(path));
-
-        final ContentClaim claim2 = repository.create(false);
-        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
-    }
 
     @Test
     public void testWriteWithNoContent() throws IOException {
@@ -292,32 +242,6 @@ public class TestFileSystemRepository {
         assertTrue(Arrays.equals(expected, baos.toByteArray()));
     }
 
-    @Test
-    public void testImportFromFileWithAppend() throws IOException {
-        final ContentClaim claim = repository.create(false);
-        final File hello = new File("src/test/resources/hello.txt");
-        final File goodbye = new File("src/test/resources/bye.txt");
-
-        repository.importFrom(hello.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, World");
-
-        repository.importFrom(goodbye.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, WorldGood-Bye, World!");
-
-        repository.importFrom(hello.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World");
-
-        repository.importFrom(goodbye.toPath(), claim, false);
-        assertContentEquals(claim, "Good-Bye, World!");
-    }
-
-    private void assertContentEquals(final ContentClaim claim, final String expected) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final InputStream in = repository.read(claim)) {
-            StreamUtils.copy(in, baos);
-        }
-        assertEquals(expected, new String(baos.toByteArray()));
-    }
 
     @Test
     public void testImportFromStream() throws IOException {