You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/27 01:50:13 UTC

nifi git commit: NIFI-516 adding option to StandardProcessSession.read to close stream

Repository: nifi
Updated Branches:
  refs/heads/master 1c1738670 -> b885f955f


NIFI-516 adding option to StandardProcessSession.read to close stream

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b885f955
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b885f955
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b885f955

Branch: refs/heads/master
Commit: b885f955f4ee97caeaf5c3a28aab967db1be4c94
Parents: 1c17386
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Wed Oct 7 10:50:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 20:23:13 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processor/ProcessSession.java   | 27 ++++++++++++++
 .../apache/nifi/util/MockProcessSession.java    |  8 ++++
 .../repository/BatchingSessionFactory.java      |  5 +++
 .../repository/StandardProcessSession.java      | 18 +++++++--
 .../repository/TestStandardProcessSession.java  | 39 +++++++++++++++++++-
 .../nifi/processors/standard/MergeContent.java  |  6 +--
 6 files changed, 95 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index ed46d68..e1e98d5 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -509,6 +509,33 @@ public interface ProcessSession {
     void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException;
 
     /**
+     * Executes the given callback against the contents corresponding to the
+     * given FlowFile.
+     *
+     * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
+     * will not be accessible once this method has completed its execution.
+     *
+     * @param source flowfile to retrieve content of
+     * @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
+     * @param reader that will be called to read the flowfile content
+     * @throws IllegalStateException if detected that this method is being
+     * called from within a callback of another method in this session and for
+     * the given FlowFile(s)
+     * @throws FlowFileHandlingException if the given FlowFile is already
+     * transferred or removed or doesn't belong to this session. Automatic
+     * rollback will occur.
+     * @throws MissingFlowFileException if the given FlowFile content cannot be
+     * found. The FlowFile should no longer be reference, will be internally
+     * destroyed, and the session is automatically rolled back and what is left
+     * of the FlowFile is destroyed.
+     * @throws FlowFileAccessException if some IO problem occurs accessing
+     * FlowFile content; if an attempt is made to access the InputStream
+     * provided to the given InputStreamCallback after this method completed its
+     * execution
+     */
+    void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
+
+    /**
      * Combines the content of all given source FlowFiles into a single given
      * destination FlowFile.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 1060854..2045acd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -400,6 +400,11 @@ public class MockProcessSession implements ProcessSession {
 
     @Override
     public void read(final FlowFile flowFile, final InputStreamCallback callback) {
+        read(flowFile, false, callback);
+    }
+
+    @Override
+    public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
         if (callback == null || flowFile == null) {
             throw new IllegalArgumentException("argument cannot be null");
         }
@@ -413,6 +418,9 @@ public class MockProcessSession implements ProcessSession {
         final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
         try {
             callback.process(bais);
+            if(!allowSessionStreamManagement){
+                bais.close();
+            }
         } catch (final IOException e) {
             throw new ProcessException(e.toString(), e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index d5dba82..083510d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -188,6 +188,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
         }
 
         @Override
+        public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
+            session.read(source, allowSessionStreamManagement, reader);
+        }
+
+        @Override
         public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
             return session.merge(sources, destination);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3ba7e4e..98a0b87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1770,6 +1770,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     @Override
     public void read(final FlowFile source, final InputStreamCallback reader) {
+        read(source, false, reader);
+    }
+
+    @Override
+    public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
 
@@ -1780,9 +1785,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
-                final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-                final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
-                final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+            final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+            final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+            final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
             // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -1795,6 +1800,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             try {
                 recursionSet.add(source);
                 reader.process(ffais);
+
+                // Allow processors to close the file after reading to avoid too many files open or do smart session stream management.
+                if(!allowSessionStreamManagement){
+                    currentReadClaimStream.close();
+                    currentReadClaimStream = null;
+                }
             } catch (final ContentNotFoundException cnfe) {
                 cnfeThrown = true;
                 throw cnfe;
@@ -1806,6 +1817,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     throw ffais.getContentNotFoundException();
                 }
             }
+
         } catch (final ContentNotFoundException nfe) {
             handleContentNotFound(nfe, record);
         } catch (final IOException ex) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 0e11923..743e185 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -85,6 +85,7 @@ public class TestStandardProcessSession {
     private StandardProcessSession session;
     private MockContentRepository contentRepo;
     private FlowFileQueue flowFileQueue;
+    private ProcessContext context;
 
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
@@ -187,7 +188,7 @@ public class TestStandardProcessSession {
         contentRepo.initialize(new StandardResourceClaimManager());
         flowFileRepo = new MockFlowFileRepository();
 
-        final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+        context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
         session = new StandardProcessSession(context);
     }
 
@@ -329,7 +330,7 @@ public class TestStandardProcessSession {
         final FlowFile flowFile = session.get();
         assertNotNull(flowFile);
         final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
-        session.read(flowFile, new InputStreamCallback() {
+        session.read(flowFile, true , new InputStreamCallback() {
             @Override
             public void process(final InputStream inputStream) throws IOException {
                 inputStreamHolder.set(inputStream);
@@ -721,6 +722,40 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testManyFilesOpened() throws IOException {
+
+        StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];
+        for(int i = 0; i<70000;i++){
+            standardProcessSessions[i] = new StandardProcessSession(context);
+
+            FlowFile flowFile = standardProcessSessions[i].create();
+            final byte[] buff = new byte["Hello".getBytes().length];
+
+            flowFile = standardProcessSessions[i].append(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    out.write("Hello".getBytes());
+                }
+            });
+
+            try {
+                standardProcessSessions[i].read(flowFile, false, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream in) throws IOException {
+                        StreamUtils.fillBuffer(in, buff);
+                    }
+                });
+            } catch (Exception e){
+                System.out.println("Failed at file:"+i);
+                throw e;
+            }
+            if(i%1000==0){
+                System.out.println("i:"+i);
+            }
+        }
+    }
+
+    @Test
     public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")

http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 2cad11e..afc2705 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -567,7 +567,7 @@ public class MergeContent extends BinFiles {
                     final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator();
                     while (itr.hasNext()) {
                         final FlowFileSessionWrapper wrapper = itr.next();
-                        wrapper.getSession().read(wrapper.getFlowFile(), new InputStreamCallback() {
+                        wrapper.getSession().read(wrapper.getFlowFile(), false, new InputStreamCallback() {
                             @Override
                             public void process(final InputStream in) throws IOException {
                                 StreamUtils.copy(in, out);
@@ -780,7 +780,7 @@ public class MergeContent extends BinFiles {
 
                         for (final FlowFileSessionWrapper wrapper : wrappers) {
                             final FlowFile flowFile = wrapper.getFlowFile();
-                            wrapper.getSession().read(flowFile, new InputStreamCallback() {
+                            wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
                                 @Override
                                 public void process(final InputStream rawIn) throws IOException {
                                     try (final InputStream in = new BufferedInputStream(rawIn)) {
@@ -893,7 +893,7 @@ public class MergeContent extends BinFiles {
                     try (final OutputStream out = new BufferedOutputStream(rawOut)) {
                         for (final FlowFileSessionWrapper wrapper : wrappers) {
                             final FlowFile flowFile = wrapper.getFlowFile();
-                            wrapper.getSession().read(flowFile, new InputStreamCallback() {
+                            wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
                                 @Override
                                 public void process(InputStream in) throws IOException {
                                     boolean canMerge = true;