You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/27 01:50:13 UTC
nifi git commit: NIFI-516 adding option to
StandardProcessSession.read to close stream
Repository: nifi
Updated Branches:
refs/heads/master 1c1738670 -> b885f955f
NIFI-516 adding option to StandardProcessSession.read to close stream
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b885f955
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b885f955
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b885f955
Branch: refs/heads/master
Commit: b885f955f4ee97caeaf5c3a28aab967db1be4c94
Parents: 1c17386
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Wed Oct 7 10:50:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 20:23:13 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processor/ProcessSession.java | 27 ++++++++++++++
.../apache/nifi/util/MockProcessSession.java | 8 ++++
.../repository/BatchingSessionFactory.java | 5 +++
.../repository/StandardProcessSession.java | 18 +++++++--
.../repository/TestStandardProcessSession.java | 39 +++++++++++++++++++-
.../nifi/processors/standard/MergeContent.java | 6 +--
6 files changed, 95 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index ed46d68..e1e98d5 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -509,6 +509,33 @@ public interface ProcessSession {
void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException;
/**
+ * Executes the given callback against the contents corresponding to the
+ * given FlowFile.
+ *
+ * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
+ * will not be accessible once this method has completed its execution.
+ *
+ * @param source flowfile to retrieve content of
+ * @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
+ * @param reader that will be called to read the flowfile content
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content; if an attempt is made to access the InputStream
+ * provided to the given InputStreamCallback after this method completed its
+ * execution
+ */
+ void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
+
+ /**
* Combines the content of all given source FlowFiles into a single given
* destination FlowFile.
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 1060854..2045acd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -400,6 +400,11 @@ public class MockProcessSession implements ProcessSession {
@Override
public void read(final FlowFile flowFile, final InputStreamCallback callback) {
+ read(flowFile, false, callback);
+ }
+
+ @Override
+ public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -413,6 +418,9 @@ public class MockProcessSession implements ProcessSession {
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
try {
callback.process(bais);
+ if(!allowSessionStreamManagement){
+ bais.close();
+ }
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index d5dba82..083510d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -188,6 +188,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
}
@Override
+ public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
+ session.read(source, allowSessionStreamManagement, reader);
+ }
+
+ @Override
public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
return session.merge(sources, destination);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3ba7e4e..98a0b87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1770,6 +1770,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void read(final FlowFile source, final InputStreamCallback reader) {
+ read(source, false, reader);
+ }
+
+ @Override
+ public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
@@ -1780,9 +1785,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
- final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
- final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+ final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+ final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+ final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -1795,6 +1800,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
recursionSet.add(source);
reader.process(ffais);
+
+ // Allow processors to close the file after reading to avoid too many files open or do smart session stream management.
+ if(!allowSessionStreamManagement){
+ currentReadClaimStream.close();
+ currentReadClaimStream = null;
+ }
} catch (final ContentNotFoundException cnfe) {
cnfeThrown = true;
throw cnfe;
@@ -1806,6 +1817,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw ffais.getContentNotFoundException();
}
}
+
} catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record);
} catch (final IOException ex) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 0e11923..743e185 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -85,6 +85,7 @@ public class TestStandardProcessSession {
private StandardProcessSession session;
private MockContentRepository contentRepo;
private FlowFileQueue flowFileQueue;
+ private ProcessContext context;
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
@@ -187,7 +188,7 @@ public class TestStandardProcessSession {
contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository();
- final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+ context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
session = new StandardProcessSession(context);
}
@@ -329,7 +330,7 @@ public class TestStandardProcessSession {
final FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
- session.read(flowFile, new InputStreamCallback() {
+ session.read(flowFile, true , new InputStreamCallback() {
@Override
public void process(final InputStream inputStream) throws IOException {
inputStreamHolder.set(inputStream);
@@ -721,6 +722,40 @@ public class TestStandardProcessSession {
}
@Test
+ public void testManyFilesOpened() throws IOException {
+
+ StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];
+ for(int i = 0; i<70000;i++){
+ standardProcessSessions[i] = new StandardProcessSession(context);
+
+ FlowFile flowFile = standardProcessSessions[i].create();
+ final byte[] buff = new byte["Hello".getBytes().length];
+
+ flowFile = standardProcessSessions[i].append(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write("Hello".getBytes());
+ }
+ });
+
+ try {
+ standardProcessSessions[i].read(flowFile, false, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buff);
+ }
+ });
+ } catch (Exception e){
+ System.out.println("Failed at file:"+i);
+ throw e;
+ }
+ if(i%1000==0){
+ System.out.println("i:"+i);
+ }
+ }
+ }
+
+ @Test
public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 2cad11e..afc2705 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -567,7 +567,7 @@ public class MergeContent extends BinFiles {
final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator();
while (itr.hasNext()) {
final FlowFileSessionWrapper wrapper = itr.next();
- wrapper.getSession().read(wrapper.getFlowFile(), new InputStreamCallback() {
+ wrapper.getSession().read(wrapper.getFlowFile(), false, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, out);
@@ -780,7 +780,7 @@ public class MergeContent extends BinFiles {
for (final FlowFileSessionWrapper wrapper : wrappers) {
final FlowFile flowFile = wrapper.getFlowFile();
- wrapper.getSession().read(flowFile, new InputStreamCallback() {
+ wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
@@ -893,7 +893,7 @@ public class MergeContent extends BinFiles {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
for (final FlowFileSessionWrapper wrapper : wrappers) {
final FlowFile flowFile = wrapper.getFlowFile();
- wrapper.getSession().read(flowFile, new InputStreamCallback() {
+ wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
boolean canMerge = true;