You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/07 16:07:57 UTC

[GitHub] asfgit closed pull request #3207: NIFI-5879: Fixed bug in FileSystemRepository that can occur if an Inp…

asfgit closed pull request #3207: NIFI-5879: Fixed bug in FileSystemRepository that can occur if an Inp…
URL: https://github.com/apache/nifi/pull/3207
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index c041f5c91b..125cd500e8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -864,9 +864,16 @@ public InputStream read(final ContentClaim claim) throws IOException {
 
         }
 
-        // see javadocs for claim.getLength() as to why we do this.
+        // A claim length of -1 indicates that the claim is still being written to and we don't know
+        // the length. In this case, we don't limit the Input Stream. If the Length has been populated, though,
+        // it is possible that the Length could then be extended. However, we do want to avoid ever allowing the
+        // stream to read past the end of the Content Claim. To accomplish this, we use a LimitedInputStream but
+        // provide a LongSupplier for the length instead of a Long value. this allows us to continue reading until
+        // we get to the end of the Claim, even if the Claim grows. This may happen, for instance, if we obtain an
+        // InputStream for this claim, then read from it, write more to the claim, and then attempt to read again. In
+        // such a case, since we have written to that same Claim, we should still be able to read those bytes.
         if (claim.getLength() >= 0) {
-            return new LimitedInputStream(fis, claim.getLength());
+            return new LimitedInputStream(fis, claim::getLength);
         } else {
             return fis;
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 4354dc416b..cc3ac19905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2267,7 +2267,9 @@ public InputStream read(FlowFile source) {
         final StandardRepositoryRecord record = getRecord(source);
 
         try {
-            ensureNotAppending(record.getCurrentClaim());
+            final ContentClaim currentClaim = record.getCurrentClaim();
+            ensureNotAppending(currentClaim);
+            claimCache.flush(currentClaim);
         } catch (final IOException e) {
             throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
index 74597ae51e..7c32cc8c08 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
@@ -18,21 +18,36 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
+import java.util.function.LongSupplier;
 
 public class LimitedInputStream extends InputStream {
 
     private final InputStream in;
-    private long limit;
+    private final long limit;
+    private final LongSupplier limitSupplier;
     private long bytesRead = 0;
+    private long markOffset = -1L;
+
+    public LimitedInputStream(final InputStream in, final LongSupplier limitSupplier) {
+        this.in = in;
+        this.limitSupplier = Objects.requireNonNull(limitSupplier);
+        this.limit = -1;
+    }
 
     public LimitedInputStream(final InputStream in, final long limit) {
         this.in = in;
         this.limit = limit;
+        this.limitSupplier = null;
+    }
+
+    private long getLimit() {
+        return limitSupplier == null ? limit : limitSupplier.getAsLong();
     }
 
     @Override
     public int read() throws IOException {
-        if (bytesRead >= limit) {
+        if (bytesRead >= getLimit()) {
             return -1;
         }
 
@@ -45,6 +60,7 @@ public int read() throws IOException {
 
     @Override
     public int read(final byte[] b) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -60,6 +76,7 @@ public int read(final byte[] b) throws IOException {
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -75,14 +92,14 @@ public int read(byte[] b, int off, int len) throws IOException {
 
     @Override
     public long skip(final long n) throws IOException {
-        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        final long skipped = in.skip(Math.min(n, getLimit() - bytesRead));
         bytesRead += skipped;
         return skipped;
     }
 
     @Override
     public int available() throws IOException {
-        return (int)(limit - bytesRead);
+        return (int)(getLimit() - bytesRead);
     }
 
     @Override
@@ -93,8 +110,7 @@ public void close() throws IOException {
     @Override
     public void mark(int readlimit) {
         in.mark(readlimit);
-        limit -= bytesRead;
-        bytesRead = 0;
+        markOffset = bytesRead;
     }
 
     @Override
@@ -105,6 +121,10 @@ public boolean markSupported() {
     @Override
     public void reset() throws IOException {
         in.reset();
-        bytesRead = 0;
+
+        if (markOffset >= 0) {
+            bytesRead = markOffset;
+        }
+        markOffset = -1;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 3ecff71b87..bf1a5798aa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -16,12 +16,23 @@
  */
 package org.apache.nifi.controller.repository;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -45,24 +56,12 @@
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.controller.repository.util.DiskUtils;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 
 public class TestFileSystemRepository {
@@ -191,6 +190,28 @@ public void testCreateContentClaim() throws IOException {
         assertEquals(1, repository.getClaimantCount(claim));
     }
 
+    @Test
+    public void testReadClaimThenWriteThenReadMore() throws IOException {
+        final ContentClaim claim = repository.create(false);
+
+        final OutputStream out = repository.write(claim);
+        out.write("hello".getBytes());
+        out.flush();
+
+        final InputStream in = repository.read(claim);
+        final byte[] buffer = new byte[5];
+        StreamUtils.fillBuffer(in, buffer);
+
+        assertEquals("hello", new String(buffer));
+
+        out.write("good-bye".getBytes());
+        out.close();
+
+        final byte[] buffer2 = new byte[8];
+        StreamUtils.fillBuffer(in, buffer2);
+        assertEquals("good-bye", new String(buffer2));
+    }
+
     @Test
     public void testClaimantCounts() throws IOException {
         final ContentClaim claim = repository.create(true);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
index 7b1e64d7a8..129fed6ab0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class TestLimitedInputStream {
 
@@ -70,9 +70,11 @@ public void testByteArrayReadWithRange() throws IOException {
     @Test
     public void testSkip() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
+        lis.mark(4);
         assertEquals(3, lis.read(buffer3));
         assertEquals(1, lis.skip(data.length));
         lis.reset();
+        lis.mark(4);
         assertEquals(4, lis.skip(7));
         lis.reset();
         assertEquals(2, lis.skip(2));
@@ -91,7 +93,7 @@ public void testClose() {
     @Test
     public void testAvailable() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
-        assertNotEquals(data.length, lis.available());
+        assertEquals(4, lis.available());
         lis.reset();
         assertEquals(4, lis.available());
         assertEquals(1, lis.read(buffer3, 0, 1));
@@ -107,14 +109,15 @@ public void testMarkSupported() {
     @Test
     public void testMark() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 6);
+        lis.mark(1000);
         assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer3));
         lis.mark(1000);
+        assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer10));
+        assertEquals(6, lis.read(buffer10));
     }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services