You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (JIRA)" <ji...@apache.org> on 2018/06/01 15:19:00 UTC

[jira] [Commented] (NIFI-5200) Nested ProcessSession.read resulting in outer stream being closed.

    [ https://issues.apache.org/jira/browse/NIFI-5200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498116#comment-16498116 ] 

Mark Payne commented on NIFI-5200:
----------------------------------

Re-Opening Jira as it introduced a new bug. In the case where `ProcessSession.read(flowFile1)` is called, followed by `ProcessSession.read(flowFile2)` within the same ProcessSession, if flowFile2's content is in the same resource claim as flowFile1, but at a later position in the claim, then an Exception is thrown when attempting to read from the stream for flowFile2:
{code:java}
2018-06-01 10:39:58,661 ERROR [Timer-Driven Process Thread-10] o.a.nifi.processors.standard.QueryRecord QueryRecord[id=9c2efcee-d6db-3017-c02d-02cc3b76b759] Unable to query StandardFlowFileRecord[uuid=06a17875-275f-47b8-b7fa-c90e43dd024f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527275483034-20, container=default, section=20], offset=747772, length=186943],offset=0,name=1.log,size=186943] due to org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=06a17875-275f-47b8-b7fa-c90e43dd024f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527275483034-20, container=default, section=20], offset=747772, length=186943],offset=0,name=1.log,size=186943]: org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=06a17875-275f-47b8-b7fa-c90e43dd024f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527275483034-20, container=default, section=20], offset=747772, length=186943],offset=0,name=1.log,size=186943]
org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=06a17875-275f-47b8-b7fa-c90e43dd024f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527275483034-20, container=default, section=20], offset=747772, length=186943],offset=0,name=1.log,size=186943]
at org.apache.nifi.queryrecord.FlowFileEnumerator.moveNext(FlowFileEnumerator.java:65)
at Baz$1$1.moveNext(Unknown Source)
at org.apache.calcite.linq4j.Linq4j$EnumeratorIterator.<init>(Linq4j.java:664)
at org.apache.calcite.linq4j.Linq4j.enumeratorIterator(Linq4j.java:98)
at org.apache.calcite.linq4j.AbstractEnumerable.iterator(AbstractEnumerable.java:33)
at org.apache.calcite.avatica.MetaImpl.createCursor(MetaImpl.java:89)
at org.apache.calcite.avatica.AvaticaResultSet.execute(AvaticaResultSet.java:196)
at org.apache.calcite.jdbc.CalciteResultSet.execute(CalciteResultSet.java:67)
at org.apache.calcite.jdbc.CalciteResultSet.execute(CalciteResultSet.java:44)
at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:513)
at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:132)
at org.apache.nifi.processors.standard.QueryRecord.queryWithCache(QueryRecord.java:470)
at org.apache.nifi.processors.standard.QueryRecord.onTrigger(QueryRecord.java:301)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.nifi.processor.exception.FlowFileAccessException: Could not read from StandardFlowFileRecord[uuid=06a17875-275f-47b8-b7fa-c90e43dd024f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527275483034-20, container=default, section=20], offset=747772, length=186943],offset=0,name=1.log,size=186943]
at org.apache.nifi.controller.repository.io.FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:93)
at org.apache.nifi.controller.repository.StandardProcessSession$6.read(StandardProcessSession.java:2284)
at org.apache.nifi.controller.repository.io.TaskTerminationInputStream.read(TaskTerminationInputStream.java:68)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.apache.nifi.grok.GrokRecordReader.nextRecord(GrokRecordReader.java:85)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at org.apache.nifi.queryrecord.FlowFileEnumerator.moveNext(FlowFileEnumerator.java:62)
... 23 common frames omitted
Caused by: java.io.IOException: Stream is closed
at org.apache.nifi.controller.repository.io.DisableOnCloseInputStream.checkClosed(DisableOnCloseInputStream.java:65)
at org.apache.nifi.controller.repository.io.DisableOnCloseInputStream.read(DisableOnCloseInputStream.java:48)
at org.apache.nifi.stream.io.ByteCountingInputStream.read(ByteCountingInputStream.java:51)
at org.apache.nifi.controller.repository.io.DisableOnCloseInputStream.read(DisableOnCloseInputStream.java:49)
at org.apache.nifi.controller.repository.io.LimitedInputStream.read(LimitedInputStream.java:69)
at org.apache.nifi.stream.io.ByteCountingInputStream.read(ByteCountingInputStream.java:51)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at org.apache.nifi.controller.repository.io.FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:82)
... 35 common frames omitted{code}

> Nested ProcessSession.read resulting in outer stream being closed.
> ------------------------------------------------------------------
>
>                 Key: NIFI-5200
>                 URL: https://issues.apache.org/jira/browse/NIFI-5200
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.6.0
>            Reporter: Peter Radden
>            Assignee: Mark Payne
>            Priority: Minor
>             Fix For: 1.7.0
>
>
> Consider this example processor:
> {code:java}
> FlowFile ff1 = session.write(session.create(),
>     (out) -> { out.write(new byte[]{ 'A', 'B' }); });
> FlowFile ff2 = session.write(session.create(),
>     (out) -> { out.write('C'); });
> session.read(ff1,
>     (in1) -> {
>         int a = in1.read();
>         session.read(ff2, (in2) -> { int c = in2.read(); });
>         int b = in1.read();
>     });
> session.transfer(ff1, REL_SUCCESS);
> session.transfer(ff2, REL_SUCCESS);{code}
> The expectation is that a='A', b='B' and c='C'.
> The actual result is that the final call to in1.read() throws due to the underlying stream being closed by the previous session.read on ff2.
> A workaround seems to be to pass the optional parameter to session.read of allowSessionStreamManagement=true.
> Is this expected that nested reads used in this way will not work?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)