You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/08 20:47:43 UTC

nifi git commit: NIFI-5200: Fixed issue with InputStream being closed when calling ProcessSession.read() twice against sequential Content Claims

Repository: nifi
Updated Branches:
  refs/heads/master 4bccab7e0 -> 00a63d17a


NIFI-5200: Fixed issue with InputStream being closed when calling ProcessSession.read() twice against sequential Content Claims

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2753


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/00a63d17
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/00a63d17
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/00a63d17

Branch: refs/heads/master
Commit: 00a63d17af3c82727b9119acb00fccfcf6639fc5
Parents: 4bccab7
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jun 1 11:14:56 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Jun 8 16:47:28 2018 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java       | 10 +++++-----
 .../repository/TestStandardProcessSession.java   | 19 +++++++++++++++++++
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/00a63d17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 2750db6..12bcafd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2141,14 +2141,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                 currentReadClaim = claim;
 
+                currentReadClaimStream = new ByteCountingInputStream(rawInStream);
+                StreamUtils.skip(currentReadClaimStream, offset);
+
                 // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
-                final InputStream disableOnClose = new DisableOnCloseInputStream(rawInStream);
-
-                currentReadClaimStream = new ByteCountingInputStream(disableOnClose);
-                StreamUtils.skip(currentReadClaimStream, offset);
+                final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream);
 
-                return currentReadClaimStream;
+                return disableOnClose;
             } else {
                 claimCache.flush(claim);
                 final InputStream rawInStream = context.getContentRepository().read(claim);

http://git-wip-us.apache.org/repos/asf/nifi/blob/00a63d17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 55fa232..f47be4d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -350,6 +350,25 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testSequentialReads() throws IOException {
+        FlowFile ff1 = session.write(session.create(), out -> out.write(new byte[] {'A', 'B'}));
+        FlowFile ff2 = session.write(session.create(), out -> out.write('C'));
+
+        final byte[] buff1 = new byte[2];
+        try (final InputStream in = session.read(ff1)) {
+            StreamUtils.fillBuffer(in, buff1);
+        }
+
+        final byte[] buff2 = new byte[1];
+        try (final InputStream in = session.read(ff2)) {
+            StreamUtils.fillBuffer(in, buff2);
+        }
+
+        Assert.assertArrayEquals(new byte[] {'A', 'B'}, buff1);
+        Assert.assertArrayEquals(new byte[] {'C'}, buff2);
+    }
+
+    @Test
     public void testCloneOriginalDataLarger() throws IOException {
         final byte[] originalContent = "hello there 12345".getBytes();
         final byte[] replacementContent = "NEW DATA".getBytes();