You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Russell Bateman <ru...@windofkeltia.com> on 2020/03/27 15:24:56 UTC

Reading the incoming flowfile "twice"

In my custom processor, I'm using a SAX parser to process an incoming 
flowfile that's in XML. Except that, this particular XML is in essence 
two different files and I would like to split, read and process the 
first "half", which starts a couple of lines (XML elements) into the 
file) not using the SAX parser. At the end, I would stream the output of 
the first half, then the SAX-processed second half.

So, in short:

 1. process the incoming flowfile for the early content not using SAX,
    but merely copying as-is; at all cost I must avoid "reassembling"
    the first half using my SAX handler (what I'm doing now),
 2. output the first part down the output stream to the resulting flowfile,
 3. (re)process the incoming flowfile using SAX (and I can just skip
    over the first bit) and spitting the result of this second part out
    down the output stream of the resulting flowfile.

I guess this is tantamount to asking how, in Java, I can read an input 
stream twice (or one-half plus one times). Maybe it's less a NiFi 
developer question and more a Java question. I have looked at it that 
way too, but, if one of you knows (particularly NiFi) best practice, I 
would very much like to hear about it.

Thanks.


Re: Reading the incoming flowfile "twice"

Posted by Otto Fowler <ot...@gmail.com>.
Oh, sorry I did not understand that you needed to do that




On March 31, 2020 at 11:22:20, Russell Bateman (russ@windofkeltia.com)
wrote:

Yes, I though of that, but there's no way to insert completing XML
structure into the input stream ahead of (<metadata>). SAX will choke if
I just start feeding it the flowfile where I left off from copying up to
</document>.

On 3/30/20 8:25 PM, Otto Fowler wrote:
> Can I ask why you would consume the whole stream when doing the non-sax
> part? If you consume the stream right up to the sax part ( the stream POS
> is at the start of the xml ) then you can just pass the stream to sax as
is
> can’t you?
>
>
>
>
> On March 30, 2020 at 16:23:27, Russell Bateman (russ@windofkeltia.com)
> wrote:
>
> If I haven't worn out my welcome, here is the simplified code that should
> demonstrate either that I have miscoded your suggestions or that the API
> doesn't in fact work as advertised. First, the output. The code, both
JUnit
> test and processor are attached and the files are pretty small.
>
> Much thanks,
> Russ
>
> This is the input stream first time around (before copying)
> ===================================
> * * * session.read( flowfile );
> Here's what's in input stream:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
> * <metadata>*
> * <date_of_service>2016-06-28 13:23</date_of_service>*
> * </metadata>*
> * <demographics>*
> * <date_of_birth>1980-07-01</date_of_birth>*
> * <age>36</age>*
> * </demographics>*
> *</cxml>*
>
> And now, let's copy some of the input stream to the output stream
> =============================
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
> Copying input stream to output stream up to </document>...
> The output stream has in it at this point:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
>
> [1. When we examine the output stream, it has what we expect.]
>
> After copying, can we reopen input stream intact and does outputstream
have
> what we think? ====
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
> Here's what's in input stream:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
>
> [2. The input stream as reported just above is truncated by exactly the
> content we did
> not copy to the output stream. We expected to see the entire,
> original file, but the
> second half is gone.]
>
> Here's what's in the output stream at this point:
> * (nothing)*
>
> [3. The content we copied to the output stream has disappeared. Does it
> disappear simply
> because we looked at it (printed it out here)?]
>
>
> On 3/29/20 5:05 AM, Joe Witt wrote:
>
> Russell
>
> I recommend writing very simple code that does two successive read/write
> operations on basic data so you can make sure the api work/as expected.
> Then add the xml bits.
>
> Thanks
>
> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com>
> <mi...@gmail.com> wrote:
>
>
> If these files are only a few MB at the most, you can also just export
them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman
> <ru...@windofkeltia.com> <ru...@windofkeltia.com>
> wrote:
>
>
> Joe and Mike,
>
> Sadly, I was not able to get very far on this. It seems that the extend
> to which I copy the first half of the contents of the input stream, I
> lose what comes after when I try to read again, basically, the second
> half comprising the <metadata>and <demographics>elements which I was
> hoping to SAX-parse. Here's code and output. I have highlighted the
> output to make it easier to read.
>
> ? <#>
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream first time around
> (before copying to output stream)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> |{|
> |||@Override|
> |||public| |void| |process( InputStream inputStream, OutputStream
> outputStream ) ||throws| |IOException|
> |||{|
> |||System.out.println( ||"And now, let's copy..."| |);|
> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> outputStream );|
> |||}|
> |} );|
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream second time around
> (after copying)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |// ...on to SAX parser which dies because the input has been truncated
>
> to|
>
> |// exactly what was written out to the output stream|
>
>
> Output of above:
>
> This is the input stream first time around (before copying to output
> stream)...
> <cxml>
> <document>
> This is the original document.
> </document>
> <metadata>
> <date_of_service>2016-06-28 13:23</date_of_service>
> </metadata>
> <demographics>
> <date_of_birth>1980-07-01</date_of_birth>
> <age>36</age>
> </demographics>
> </cxml>
>
> And now, let's copy...
> This is the input stream second time around (after copying)...
> <cxml>
> <document>
> This is the original document.
> </document>
> And now, we'll go on to the SAX parser...
> <cxml> <document> This is the original document. </document>
> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> document structures must start and end within the same entity.
>
>
> I left off the code that prints, "And now, we'll go on to the SAX
> parser..." It's in the next flowfile = session.write( ... ). I have unit
> tests that verify the good functioning of
> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> "file" is truncated; SAX finds the first "half" just fine, but there is
> no second "half". If I comment out copying from input stream to output
> stream, the error doesn't occur--the whole document is there.
>
> Thanks for looking at this again if you can,
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
>
> you should be able to call write as many times as you need. just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
>
> wrote:
>
>
> Mike,
>
> Many thanks for responding. Do you mean to say that all I have to do
>
> is
>
> something like this?
>
> public void onTrigger( final ProcessContext context, final
> ProcessSession session ) throws ProcessException
> {
> FlowFile flowfile = session.get();
> ...
>
> // this is will be our resulting flowfile...
> AtomicReference< OutputStream > savedOutputStream = new
> AtomicReference<>();
>
> /* Do some processing on the in-coming flowfile then close its
> input stream, but
> * save the output stream for continued use.
> */
> * session.write( flowfile, new InputStreamCallback()*
> {
> @Override
> * public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException*
> {
> savedOutputStream.set( outputStream );
> ...
>
> // processing puts some output on the output stream...
> outputStream.write( etc. );
>
> inputStream.close();
> }
> * } );*
>
> /* Start over doing different processing on the
>
> (same/reopened)
>
> in-coming flowfile
> * continuing to use the original output stream. It's our
> responsibility to close
> * the saved output stream, NiFi closes the unused output
>
> stream
>
> opened, but
> * ignored by us.
> */
> * session.write( flowfile, new StreamCallback()*
> {
> @Override
> * public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException*
> {
> outputStream = savedOutputStream.get(); // (discard the
>
> new
>
> output stream)
> ...
>
> // processing puts (some more) output on the original
>
> output
>
> stream...
> outputStream.write( etc. );
>
> outputStream.close();
> }
> * } );*
>
> session.transfer( flowfile, etc. );
> }
>
> I'm wondering if this will work to "discard" the new output stream
> opened for me (the second time) and replace it with the original one
> which was probably closed when the first call to
> session.write()finished. What's on these streams is way too big for me
> to put them into temporary memory, say, a ByteArrayOutputStream.
>
> Russ
>
> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>
> session.read(FlowFile) just gives you an InputStream. You should be
>
> able
>
> to
>
> rerun that as many times as you want provided you properly close it.
>
> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>
> russ@windofkeltia.com>
>
> wrote:
>
>
> In my custom processor, I'm using a SAX parser to process an
>
> incoming
>
> flowfile that's in XML. Except that, this particular XML is in
>
> essence
>
> two different files and I would like to split, read and process the
> first "half", which starts a couple of lines (XML elements) into the
> file) not using the SAX parser. At the end, I would stream the
>
> output
>
> of
>
> the first half, then the SAX-processed second half.
>
> So, in short:
>
> 1. process the incoming flowfile for the early content not using
>
> SAX,
>
> but merely copying as-is; at all cost I must avoid
>
> "reassembling"
>
> the first half using my SAX handler (what I'm doing now),
> 2. output the first part down the output stream to the resulting
>
> flowfile,
>
> 3. (re)process the incoming flowfile using SAX (and I can just
>
> skip
>
> over the first bit) and spitting the result of this second
>
> part
>
> out
>
> down the output stream of the resulting flowfile.
>
> I guess this is tantamount to asking how, in Java, I can read an
>
> input
>
> stream twice (or one-half plus one times). Maybe it's less a NiFi
> developer question and more a Java question. I have looked at it
>
> that
>
> way too, but, if one of you knows (particularly NiFi) best
>
> practice, I
>
> would very much like to hear about it.
>
> Thanks.
>

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Yes, I though of that, but there's no way to insert completing XML 
structure into the input stream ahead of (<metadata>). SAX will choke if 
I just start feeding it the flowfile where I left off from copying up to 
</document>.

On 3/30/20 8:25 PM, Otto Fowler wrote:
> Can I ask why you would consume the whole stream when doing the non-sax
> part? If you consume the stream right up to the sax part ( the stream POS
> is at the start of the xml ) then you can just pass the stream to sax as is
> can’t you?
>
>
>
>
> On March 30, 2020 at 16:23:27, Russell Bateman (russ@windofkeltia.com)
> wrote:
>
> If I haven't worn out my welcome, here is the simplified code that should
> demonstrate either that I have miscoded your suggestions or that the API
> doesn't in fact work as advertised. First, the output. The code, both JUnit
> test and processor are attached and the files are pretty small.
>
> Much thanks,
> Russ
>
> This is the input stream first time around (before copying)
> ===================================
> * * * session.read( flowfile );
>        Here's what's in input stream:
> *<cxml>*
> *  <document>*
> *    This is the original document.*
> *  </document>*
> *  <metadata>*
> *    <date_of_service>2016-06-28 13:23</date_of_service>*
> *  </metadata>*
> *  <demographics>*
> *    <date_of_birth>1980-07-01</date_of_birth>*
> *    <age>36</age>*
> *  </demographics>*
> *</cxml>*
>
> And now, let's copy some of the input stream to the output stream
> =============================
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>        Copying input stream to output stream up to </document>...
>        The output stream has in it at this point:
> *<cxml>*
> *  <document>*
> *    This is the original document.*
> *  </document>*
>
> [1. When we examine the output stream, it has what we expect.]
>
> After copying, can we reopen input stream intact and does outputstream have
> what we think? ====
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>        Here's what's in input stream:
> *<cxml>*
> *  <document>*
> *    This is the original document.*
> *  </document>*
>
> [2. The input stream as reported just above is truncated by exactly the
> content we did
>        not copy to the output stream. We expected to see the entire,
> original file, but the
>        second half is gone.]
>
>        Here's what's in the output stream at this point:
> * (nothing)*
>
> [3. The content we copied to the output stream has disappeared. Does it
> disappear simply
>      because we looked at it (printed it out here)?]
>
>
> On 3/29/20 5:05 AM, Joe Witt wrote:
>
> Russell
>
> I recommend writing very simple code that does two successive read/write
> operations on basic data so you can make sure the api work/as expected.
> Then add the xml bits.
>
> Thanks
>
> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com>
> <mi...@gmail.com> wrote:
>
>
> If these files are only a few MB at the most, you can also just export them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman
> <ru...@windofkeltia.com> <ru...@windofkeltia.com>
> wrote:
>
>
> Joe and Mike,
>
> Sadly, I was not able to get very far on this. It seems that the extend
> to which I copy the first half of the contents of the input stream, I
> lose what comes after when I try to read again, basically, the second
> half comprising the <metadata>and <demographics>elements which I was
> hoping to SAX-parse. Here's code and output. I have highlighted the
> output to make it easier to read.
>
> ? <#>
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream first time around
> (before copying to output stream)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> |{|
> |||@Override|
> |||public| |void| |process( InputStream inputStream, OutputStream
> outputStream ) ||throws| |IOException|
> |||{|
> |||System.out.println( ||"And now, let's copy..."| |);|
> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> outputStream );|
> |||}|
> |} );|
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream second time around
> (after copying)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |// ...on to SAX parser which dies because the input has been truncated
>
> to|
>
> |// exactly what was written out to the output stream|
>
>
> Output of above:
>
> This is the input stream first time around (before copying to output
> stream)...
> <cxml>
>     <document>
>       This is the original document.
>     </document>
>     <metadata>
>       <date_of_service>2016-06-28 13:23</date_of_service>
>     </metadata>
>     <demographics>
>       <date_of_birth>1980-07-01</date_of_birth>
>       <age>36</age>
>     </demographics>
> </cxml>
>
> And now, let's copy...
> This is the input stream second time around (after copying)...
> <cxml>
>     <document>
>       This is the original document.
>     </document>
> And now, we'll go on to the SAX parser...
> <cxml> <document> This is the original document. </document>
> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> document structures must start and end within the same entity.
>
>
> I left off the code that prints, "And now, we'll go on to the SAX
> parser..." It's in the next flowfile = session.write( ... ). I have unit
> tests that verify the good functioning of
> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> "file" is truncated; SAX finds the first "half" just fine, but there is
> no second "half". If I comment out copying from input stream to output
> stream, the error doesn't occur--the whole document is there.
>
> Thanks for looking at this again if you can,
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
>
> you should be able to call write as many times as you need.  just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
>
> wrote:
>
>
> Mike,
>
> Many thanks for responding. Do you mean to say that all I have to do
>
> is
>
> something like this?
>
>       public void onTrigger( final ProcessContext context, final
>       ProcessSession session ) throws ProcessException
>       {
>          FlowFile flowfile = session.get();
>          ...
>
>          // this is will be our resulting flowfile...
>          AtomicReference< OutputStream > savedOutputStream = new
>       AtomicReference<>();
>
>          /* Do some processing on the in-coming flowfile then close its
>       input stream, but
>           * save the output stream for continued use.
>           */
>       *  session.write( flowfile, new InputStreamCallback()*
>          {
>            @Override
>       *    public void process( InputStream inputStream, OutputStream
>       outputStream ) throws IOException*
>            {
>              savedOutputStream.set( outputStream );
>              ...
>
>              // processing puts some output on the output stream...
>              outputStream.write( etc. );
>
>              inputStream.close();
>            }
>       *  } );*
>
>          /* Start over doing different processing on the
>
> (same/reopened)
>
>       in-coming flowfile
>           * continuing to use the original output stream. It's our
>       responsibility to close
>           * the saved output stream, NiFi closes the unused output
>
> stream
>
>       opened, but
>           * ignored by us.
>           */
>       *  session.write( flowfile, new StreamCallback()*
>          {
>            @Override
>       *    public void process( InputStream inputStream, OutputStream
>       outputStream ) throws IOException*
>            {
>              outputStream = savedOutputStream.get(); // (discard the
>
> new
>
>       output stream)
>              ...
>
>              // processing puts (some more) output on the original
>
> output
>
>       stream...
>              outputStream.write( etc. );
>
>              outputStream.close();
>            }
>       *  } );*
>
>          session.transfer( flowfile, etc. );
>       }
>
> I'm wondering if this will work to "discard" the new output stream
> opened for me (the second time) and replace it with the original one
> which was probably closed when the first call to
> session.write()finished. What's on these streams is way too big for me
> to put them into temporary memory, say, a ByteArrayOutputStream.
>
> Russ
>
> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>
> session.read(FlowFile) just gives you an InputStream. You should be
>
> able
>
> to
>
> rerun that as many times as you want provided you properly close it.
>
> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>
> russ@windofkeltia.com>
>
> wrote:
>
>
> In my custom processor, I'm using a SAX parser to process an
>
> incoming
>
> flowfile that's in XML. Except that, this particular XML is in
>
> essence
>
> two different files and I would like to split, read and process the
> first "half", which starts a couple of lines (XML elements) into the
> file) not using the SAX parser. At the end, I would stream the
>
> output
>
> of
>
> the first half, then the SAX-processed second half.
>
> So, in short:
>
>     1. process the incoming flowfile for the early content not using
>
> SAX,
>
>        but merely copying as-is; at all cost I must avoid
>
> "reassembling"
>
>        the first half using my SAX handler (what I'm doing now),
>     2. output the first part down the output stream to the resulting
>
> flowfile,
>
>     3. (re)process the incoming flowfile using SAX (and I can just
>
> skip
>
>        over the first bit) and spitting the result of this second
>
> part
>
> out
>
>        down the output stream of the resulting flowfile.
>
> I guess this is tantamount to asking how, in Java, I can read an
>
> input
>
> stream twice (or one-half plus one times). Maybe it's less a NiFi
> developer question and more a Java question. I have looked at it
>
> that
>
> way too, but, if one of you knows (particularly NiFi) best
>
> practice, I
>
> would very much like to hear about it.
>
> Thanks.
>


Re: Reading the incoming flowfile "twice"

Posted by Otto Fowler <ot...@gmail.com>.
Can I ask why you would consume the whole stream when doing the non-sax
part? If you consume the stream right up to the sax part ( the stream POS
is at the start of the xml ) then you can just pass the stream to sax as is
can’t you?




On March 30, 2020 at 16:23:27, Russell Bateman (russ@windofkeltia.com)
wrote:

If I haven't worn out my welcome, here is the simplified code that should
demonstrate either that I have miscoded your suggestions or that the API
doesn't in fact work as advertised. First, the output. The code, both JUnit
test and processor are attached and the files are pretty small.

Much thanks,
Russ

This is the input stream first time around (before copying)
===================================
* * * session.read( flowfile );
      Here's what's in input stream:
*<cxml>*
*  <document>*
*    This is the original document.*
*  </document>*
*  <metadata>*
*    <date_of_service>2016-06-28 13:23</date_of_service>*
*  </metadata>*
*  <demographics>*
*    <date_of_birth>1980-07-01</date_of_birth>*
*    <age>36</age>*
*  </demographics>*
*</cxml>*

And now, let's copy some of the input stream to the output stream
=============================
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Copying input stream to output stream up to </document>...
      The output stream has in it at this point:
*<cxml>*
*  <document>*
*    This is the original document.*
*  </document>*

[1. When we examine the output stream, it has what we expect.]

After copying, can we reopen input stream intact and does outputstream have
what we think? ====
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Here's what's in input stream:
*<cxml>*
*  <document>*
*    This is the original document.*
*  </document>*

[2. The input stream as reported just above is truncated by exactly the
content we did
      not copy to the output stream. We expected to see the entire,
original file, but the
      second half is gone.]

      Here's what's in the output stream at this point:
* (nothing)*

[3. The content we copied to the output stream has disappeared. Does it
disappear simply
    because we looked at it (printed it out here)?]


On 3/29/20 5:05 AM, Joe Witt wrote:

Russell

I recommend writing very simple code that does two successive read/write
operations on basic data so you can make sure the api work/as expected.
Then add the xml bits.

Thanks

On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com>
<mi...@gmail.com> wrote:


If these files are only a few MB at the most, you can also just export them
to a ByteArrayOutputStream. Just a thought.

On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman
<ru...@windofkeltia.com> <ru...@windofkeltia.com>
wrote:


Joe and Mike,

Sadly, I was not able to get very far on this. It seems that the extend
to which I copy the first half of the contents of the input stream, I
lose what comes after when I try to read again, basically, the second
half comprising the <metadata>and <demographics>elements which I was
hoping to SAX-parse. Here's code and output. I have highlighted the
output to make it easier to read.

? <#>
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream first time around
(before copying to output stream)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|flowfile = session.write( flowfile, ||new| |StreamCallback()|
|{|
|||@Override|
|||public| |void| |process( InputStream inputStream, OutputStream
outputStream ) ||throws| |IOException|
|||{|
|||System.out.println( ||"And now, let's copy..."| |);|
|||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
outputStream );|
|||}|
|} );|
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream second time around
(after copying)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|// ...on to SAX parser which dies because the input has been truncated

to|

|// exactly what was written out to the output stream|


Output of above:

This is the input stream first time around (before copying to output
stream)...
<cxml>
   <document>
     This is the original document.
   </document>
   <metadata>
     <date_of_service>2016-06-28 13:23</date_of_service>
   </metadata>
   <demographics>
     <date_of_birth>1980-07-01</date_of_birth>
     <age>36</age>
   </demographics>
</cxml>

And now, let's copy...
This is the input stream second time around (after copying)...
<cxml>
   <document>
     This is the original document.
   </document>
And now, we'll go on to the SAX parser...
<cxml> <document> This is the original document. </document>
[pool-1-thread-1] ERROR [...] SAX ruleparser error:
org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
document structures must start and end within the same entity.


I left off the code that prints, "And now, we'll go on to the SAX
parser..." It's in the next flowfile = session.write( ... ). I have unit
tests that verify the good functioning of
copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
"file" is truncated; SAX finds the first "half" just fine, but there is
no second "half". If I comment out copying from input stream to output
stream, the error doesn't occur--the whole document is there.

Thanks for looking at this again if you can,
Russ

On 3/27/20 3:08 PM, Joe Witt wrote:

you should be able to call write as many times as you need.  just keep
using the resulting flowfile reference into the next call.

On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com

wrote:


Mike,

Many thanks for responding. Do you mean to say that all I have to do

is

something like this?

     public void onTrigger( final ProcessContext context, final
     ProcessSession session ) throws ProcessException
     {
        FlowFile flowfile = session.get();
        ...

        // this is will be our resulting flowfile...
        AtomicReference< OutputStream > savedOutputStream = new
     AtomicReference<>();

        /* Do some processing on the in-coming flowfile then close its
     input stream, but
         * save the output stream for continued use.
         */
     *  session.write( flowfile, new InputStreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            savedOutputStream.set( outputStream );
            ...

            // processing puts some output on the output stream...
            outputStream.write( etc. );

            inputStream.close();
          }
     *  } );*

        /* Start over doing different processing on the

(same/reopened)

     in-coming flowfile
         * continuing to use the original output stream. It's our
     responsibility to close
         * the saved output stream, NiFi closes the unused output

stream

     opened, but
         * ignored by us.
         */
     *  session.write( flowfile, new StreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            outputStream = savedOutputStream.get(); // (discard the

new

     output stream)
            ...

            // processing puts (some more) output on the original

output

     stream...
            outputStream.write( etc. );

            outputStream.close();
          }
     *  } );*

        session.transfer( flowfile, etc. );
     }

I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.

Russ

On 3/27/20 10:03 AM, Mike Thomsen wrote:

session.read(FlowFile) just gives you an InputStream. You should be

able

to

rerun that as many times as you want provided you properly close it.

On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <

russ@windofkeltia.com>

wrote:


In my custom processor, I'm using a SAX parser to process an

incoming

flowfile that's in XML. Except that, this particular XML is in

essence

two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the

output

of

the first half, then the SAX-processed second half.

So, in short:

   1. process the incoming flowfile for the early content not using

SAX,

      but merely copying as-is; at all cost I must avoid

"reassembling"

      the first half using my SAX handler (what I'm doing now),
   2. output the first part down the output stream to the resulting

flowfile,

   3. (re)process the incoming flowfile using SAX (and I can just

skip

      over the first bit) and spitting the result of this second

part

out

      down the output stream of the resulting flowfile.

I guess this is tantamount to asking how, in Java, I can read an

input

stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it

that

way too, but, if one of you knows (particularly NiFi) best

practice, I

would very much like to hear about it.

Thanks.

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
And, also *out*?

On 3/31/20 12:35 PM, Russell Bateman wrote:
> Wait, where is *modified*from?
>
> Thanks
>
> On 3/31/20 12:24 PM, Mark Payne wrote:
>> Russ,
>>
>> OK, so then I think the pattern you’d want to follow would be something like this:
>>
>> FlowFile original = session.get();
>> if (flowFile == null) {
>>      return;
>> }
>>
>> FlowFile output = session.create(original);
>>
>> // Begin writing to ‘output flowfile'
>> output = session.write(modified, new OutputStreamCallback() {
>>      void process(OutputStream out) {
>>
>>          // read from original FlowFile
>>          session.read(original, new InputStreamCallback() {
>>            void process(InputStream in) {
>>                 copyFirstHalf(in, out);
>>            }
>>         });
>>
>>
>>          // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
>>         session.read(original, new InputStreamCallback() {
>>               void process(InputStream in) {
>>                    processWithSaxParser(in,*out*);
>>               }
>>         });
>>
>>      }
>> });
>>
>> session.transfer(output, REL_SUCCESS);
>> session.remove(original);
>>
>>
>> Thanks
>> -Mark
>>
>>
>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<ru...@windofkeltia.com>  wrote:
>>>
>>> Mark,
>>>
>>> Thanks for getting back. My steps are:
>>>
>>> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
>>>
>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
>>>
>>> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
>>>
>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
>>>
>>> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
>>>
>>> Thanks,
>>> Russ
>>>
>>>
>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>> Russ,
>>>>
>>>> As far as I can tell, this is working exactly as expected.
>>>>
>>>> To verify, I created a simple Integration test, as well, which I attached below.
>>>>
>>>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>>>>
>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>
>>>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>>>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>>>>
>>>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>>>>
>>>> FlowFile original = session.get();
>>>> If (original == null) {
>>>>    return;
>>>> }
>>>>
>>>> session.read(original, new InputStreamCallback() {…});
>>>>
>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>
>>>> // Read the original FlowFile’s content
>>>> session.read(original, new InputStreamCallback() { … });
>>>>
>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>>>>
>>>>
>>>>
>>>> Hope this helps!
>>>> -Mark
>>>>
>>>>
>>>>
>>>>
>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>> wrote:
>>>>>
>>>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>>>>
>>>>> Much thanks,
>>>>> Russ
>>>>>
>>>>> This is the input stream first time around (before copying) ===================================
>>>>> * * * session.read( flowfile );
>>>>>        Here's what's in input stream:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>**
>>>>> **<metadata>**
>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>> **</metadata>**
>>>>> **<demographics>**
>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>> **<age>36</age>**
>>>>> **</demographics>**
>>>>> **</cxml>*
>>>>>
>>>>> And now, let's copy some of the input stream to the output stream =============================
>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>        Copying input stream to output stream up to </document>...
>>>>>        The output stream has in it at this point:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>**
>>>>> *
>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>
>>>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>        Here's what's in input stream:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>*
>>>>>
>>>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>>>        not copy to the output stream. We expected to see the entire, original file, but the
>>>>>        second half is gone.]
>>>>>
>>>>>        Here's what's in the output stream at this point:
>>>>> * (nothing)*
>>>>>
>>>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>>>      because we looked at it (printed it out here)?]
>>>>>
>>>>>
>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>> Russell
>>>>>>
>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>> Then add the xml bits.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>   <ma...@gmail.com>   wrote:
>>>>>>
>>>>>>> If these files are only a few MB at the most, you can also just export them
>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>
>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>   <ma...@windofkeltia.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Joe and Mike,
>>>>>>>>
>>>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>> output to make it easier to read.
>>>>>>>>
>>>>>>>> ? <#>
>>>>>>>> |try|
>>>>>>>> |{|
>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>> |||inputStream.close();|
>>>>>>>> |}|
>>>>>>>> |catch||( IOException e )|
>>>>>>>> |{|
>>>>>>>> |||e.printStackTrace();|
>>>>>>>> |}|
>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>> |{|
>>>>>>>> |||@Override|
>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>> |||{|
>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>> outputStream );|
>>>>>>>> |||}|
>>>>>>>> |} );|
>>>>>>>> |try|
>>>>>>>> |{|
>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>> (after copying)..."| |);|
>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>> |||inputStream.close();|
>>>>>>>> |}|
>>>>>>>> |catch||( IOException e )|
>>>>>>>> |{|
>>>>>>>> |||e.printStackTrace();|
>>>>>>>> |}|
>>>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>>>> to|
>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>
>>>>>>>>
>>>>>>>> Output of above:
>>>>>>>>
>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>> stream)...
>>>>>>>> <cxml>
>>>>>>>>     <document>
>>>>>>>>       This is the original document.
>>>>>>>>     </document>
>>>>>>>>     <metadata>
>>>>>>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>     </metadata>
>>>>>>>>     <demographics>
>>>>>>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>       <age>36</age>
>>>>>>>>     </demographics>
>>>>>>>> </cxml>
>>>>>>>>
>>>>>>>> And now, let's copy...
>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>> <cxml>
>>>>>>>>     <document>
>>>>>>>>       This is the original document.
>>>>>>>>     </document>
>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>
>>>>>>>>
>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>>>> tests that verify the good functioning of
>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>
>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>> Russ
>>>>>>>>
>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>
>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com   <ma...@windofkeltia.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Mike,
>>>>>>>>>>
>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>> is
>>>>>>>>>> something like this?
>>>>>>>>>>
>>>>>>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>>>>>>       ProcessSession session ) throws ProcessException
>>>>>>>>>>       {
>>>>>>>>>>          FlowFile flowfile = session.get();
>>>>>>>>>>          ...
>>>>>>>>>>
>>>>>>>>>>          // this is will be our resulting flowfile...
>>>>>>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>       AtomicReference<>();
>>>>>>>>>>
>>>>>>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>>>>>>       input stream, but
>>>>>>>>>>           * save the output stream for continued use.
>>>>>>>>>>           */
>>>>>>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>          {
>>>>>>>>>>            @Override
>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>            {
>>>>>>>>>>              savedOutputStream.set( outputStream );
>>>>>>>>>>              ...
>>>>>>>>>>
>>>>>>>>>>              // processing puts some output on the output stream...
>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>
>>>>>>>>>>              inputStream.close();
>>>>>>>>>>            }
>>>>>>>>>>       *  } );*
>>>>>>>>>>
>>>>>>>>>>          /* Start over doing different processing on the
>>>>>>> (same/reopened)
>>>>>>>>>>       in-coming flowfile
>>>>>>>>>>           * continuing to use the original output stream. It's our
>>>>>>>>>>       responsibility to close
>>>>>>>>>>           * the saved output stream, NiFi closes the unused output
>>>>>>> stream
>>>>>>>>>>       opened, but
>>>>>>>>>>           * ignored by us.
>>>>>>>>>>           */
>>>>>>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>>          {
>>>>>>>>>>            @Override
>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>            {
>>>>>>>>>>              outputStream = savedOutputStream.get(); // (discard the
>>>>>>> new
>>>>>>>>>>       output stream)
>>>>>>>>>>              ...
>>>>>>>>>>
>>>>>>>>>>              // processing puts (some more) output on the original
>>>>>>> output
>>>>>>>>>>       stream...
>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>
>>>>>>>>>>              outputStream.close();
>>>>>>>>>>            }
>>>>>>>>>>       *  } );*
>>>>>>>>>>
>>>>>>>>>>          session.transfer( flowfile, etc. );
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>
>>>>>>>>>> Russ
>>>>>>>>>>
>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>> able
>>>>>>>>>> to
>>>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>> russ@windofkeltia.com   <ma...@windofkeltia.com>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>> incoming
>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>> essence
>>>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>> output
>>>>>>>> of
>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>
>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>
>>>>>>>>>>>>     1. process the incoming flowfile for the early content not using
>>>>>>>> SAX,
>>>>>>>>>>>>        but merely copying as-is; at all cost I must avoid
>>>>>>>> "reassembling"
>>>>>>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>     2. output the first part down the output stream to the resulting
>>>>>>>>>> flowfile,
>>>>>>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>> skip
>>>>>>>>>>>>        over the first bit) and spitting the result of this second
>>>>>>> part
>>>>>>>> out
>>>>>>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>>>>>>
>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>> input
>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>> that
>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>> practice, I
>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>


Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Let's see... Does this fix the typos the way you intended?

public void onTrigger( final ProcessContext context, final 
ProcessSession session ) throws ProcessException
{
   FlowFile original = session.get(); if( original == null ) { 
context.yield(); return; }
   FlowFile output   = session.create( original );

   // Begin writing to ‘output flowfile'
   FlowFile *modified* = session.write( output, new OutputStreamCallback()
   {
     @Override
     public void process( OutputStream out )
     {
       // read from original FlowFile
       session.read( original, new InputStreamCallback()
       {
         @Override
         public void process( InputStream in ) throws IOException
         {
           copyFirstHalf( in, out );
         }
       } );

       // read from original FlowFile a second time. Use a SAX parser to 
parse it
       // and write to the end of the ‘output flowfile'
       session.read( original, new InputStreamCallback()
       {
         @Override
         public void process( InputStream in ) throws IOException
         {
           processWithSaxParser( in, out );
         }
       } );
     }
   } );

   session.transfer( *modified*, SUCCESS );
   session.remove( original );
}

This seems very close to working for me; I don't see anything wrong and 
just need to plug in my SAX parser. This modified session is a new 
pattern for me (and a useful one).

Thanks!

On 3/31/20 12:44 PM, Russell Bateman wrote:
> (Oh, I see where *out*comes from, but not *modified*.)
>
> On 3/31/20 12:35 PM, Russell Bateman wrote:
>> Wait, where is *modified*from?
>>
>> Thanks
>>
>> On 3/31/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>>
>>> OK, so then I think the pattern you’d want to follow would be something like this:
>>>
>>> FlowFile original = session.get();
>>> if (flowFile == null) {
>>>      return;
>>> }
>>>
>>> FlowFile output = session.create(original);
>>>
>>> // Begin writing to ‘output flowfile'
>>> output = session.write(*modified*, new OutputStreamCallback() {
>>>      void process(OutputStream*out*) {
>>>
>>>          // read from original FlowFile
>>>          session.read(original, new InputStreamCallback() {
>>>            void process(InputStream in) {
>>>                 copyFirstHalf(in, out);
>>>            }
>>>         });
>>>
>>>
>>>          // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
>>>         session.read(original, new InputStreamCallback() {
>>>               void process(InputStream in) {
>>>                    processWithSaxParser(in,*out*);
>>>               }
>>>         });
>>>
>>>      }
>>> });
>>>
>>> session.transfer(output, REL_SUCCESS);
>>> session.remove(original);
>>>
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<ru...@windofkeltia.com>  wrote:
>>>>
>>>> Mark,
>>>>
>>>> Thanks for getting back. My steps are:
>>>>
>>>> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
>>>>
>>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
>>>>
>>>> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
>>>>
>>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
>>>>
>>>> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
>>>>
>>>> Thanks,
>>>> Russ
>>>>
>>>>
>>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>>> Russ,
>>>>>
>>>>> As far as I can tell, this is working exactly as expected.
>>>>>
>>>>> To verify, I created a simple Integration test, as well, which I attached below.
>>>>>
>>>>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>>>>>
>>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>>
>>>>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>>>>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>>>>>
>>>>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>>>>>
>>>>> FlowFile original = session.get();
>>>>> If (original == null) {
>>>>>    return;
>>>>> }
>>>>>
>>>>> session.read(original, new InputStreamCallback() {…});
>>>>>
>>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>>
>>>>> // Read the original FlowFile’s content
>>>>> session.read(original, new InputStreamCallback() { … });
>>>>>
>>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>>>>>
>>>>>
>>>>>
>>>>> Hope this helps!
>>>>> -Mark
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>> wrote:
>>>>>>
>>>>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>>>>>
>>>>>> Much thanks,
>>>>>> Russ
>>>>>>
>>>>>> This is the input stream first time around (before copying) ===================================
>>>>>> * * * session.read( flowfile );
>>>>>>        Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> **<metadata>**
>>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>>> **</metadata>**
>>>>>> **<demographics>**
>>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>>> **<age>36</age>**
>>>>>> **</demographics>**
>>>>>> **</cxml>*
>>>>>>
>>>>>> And now, let's copy some of the input stream to the output stream =============================
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>        Copying input stream to output stream up to </document>...
>>>>>>        The output stream has in it at this point:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> *
>>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>>
>>>>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>        Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>*
>>>>>>
>>>>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>>>>        not copy to the output stream. We expected to see the entire, original file, but the
>>>>>>        second half is gone.]
>>>>>>
>>>>>>        Here's what's in the output stream at this point:
>>>>>> * (nothing)*
>>>>>>
>>>>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>>>>      because we looked at it (printed it out here)?]
>>>>>>
>>>>>>
>>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>>> Russell
>>>>>>>
>>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>>> Then add the xml bits.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>   <ma...@gmail.com>   wrote:
>>>>>>>
>>>>>>>> If these files are only a few MB at the most, you can also just export them
>>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>>
>>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>   <ma...@windofkeltia.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Joe and Mike,
>>>>>>>>>
>>>>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>>> output to make it easier to read.
>>>>>>>>>
>>>>>>>>> ? <#>
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>>> |{|
>>>>>>>>> |||@Override|
>>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>>> |||{|
>>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>>> outputStream );|
>>>>>>>>> |||}|
>>>>>>>>> |} );|
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>>> (after copying)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>>>>> to|
>>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Output of above:
>>>>>>>>>
>>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>>> stream)...
>>>>>>>>> <cxml>
>>>>>>>>>     <document>
>>>>>>>>>       This is the original document.
>>>>>>>>>     </document>
>>>>>>>>>     <metadata>
>>>>>>>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>>     </metadata>
>>>>>>>>>     <demographics>
>>>>>>>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>>       <age>36</age>
>>>>>>>>>     </demographics>
>>>>>>>>> </cxml>
>>>>>>>>>
>>>>>>>>> And now, let's copy...
>>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>>> <cxml>
>>>>>>>>>     <document>
>>>>>>>>>       This is the original document.
>>>>>>>>>     </document>
>>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>>>>> tests that verify the good functioning of
>>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>>
>>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>>> Russ
>>>>>>>>>
>>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com   <ma...@windofkeltia.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Mike,
>>>>>>>>>>>
>>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>>> is
>>>>>>>>>>> something like this?
>>>>>>>>>>>
>>>>>>>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>>>>>>>       ProcessSession session ) throws ProcessException
>>>>>>>>>>>       {
>>>>>>>>>>>          FlowFile flowfile = session.get();
>>>>>>>>>>>          ...
>>>>>>>>>>>
>>>>>>>>>>>          // this is will be our resulting flowfile...
>>>>>>>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>>       AtomicReference<>();
>>>>>>>>>>>
>>>>>>>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>>>>>>>       input stream, but
>>>>>>>>>>>           * save the output stream for continued use.
>>>>>>>>>>>           */
>>>>>>>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>>          {
>>>>>>>>>>>            @Override
>>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>>            {
>>>>>>>>>>>              savedOutputStream.set( outputStream );
>>>>>>>>>>>              ...
>>>>>>>>>>>
>>>>>>>>>>>              // processing puts some output on the output stream...
>>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>>
>>>>>>>>>>>              inputStream.close();
>>>>>>>>>>>            }
>>>>>>>>>>>       *  } );*
>>>>>>>>>>>
>>>>>>>>>>>          /* Start over doing different processing on the
>>>>>>>> (same/reopened)
>>>>>>>>>>>       in-coming flowfile
>>>>>>>>>>>           * continuing to use the original output stream. It's our
>>>>>>>>>>>       responsibility to close
>>>>>>>>>>>           * the saved output stream, NiFi closes the unused output
>>>>>>>> stream
>>>>>>>>>>>       opened, but
>>>>>>>>>>>           * ignored by us.
>>>>>>>>>>>           */
>>>>>>>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>>>          {
>>>>>>>>>>>            @Override
>>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>>            {
>>>>>>>>>>>              outputStream = savedOutputStream.get(); // (discard the
>>>>>>>> new
>>>>>>>>>>>       output stream)
>>>>>>>>>>>              ...
>>>>>>>>>>>
>>>>>>>>>>>              // processing puts (some more) output on the original
>>>>>>>> output
>>>>>>>>>>>       stream...
>>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>>
>>>>>>>>>>>              outputStream.close();
>>>>>>>>>>>            }
>>>>>>>>>>>       *  } );*
>>>>>>>>>>>
>>>>>>>>>>>          session.transfer( flowfile, etc. );
>>>>>>>>>>>       }
>>>>>>>>>>>
>>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>>
>>>>>>>>>>> Russ
>>>>>>>>>>>
>>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>>> russ@windofkeltia.com   <ma...@windofkeltia.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>>> incoming
>>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>>> essence
>>>>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>>> output
>>>>>>>>> of
>>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>>
>>>>>>>>>>>>>     1. process the incoming flowfile for the early content not using
>>>>>>>>> SAX,
>>>>>>>>>>>>>        but merely copying as-is; at all cost I must avoid
>>>>>>>>> "reassembling"
>>>>>>>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>>     2. output the first part down the output stream to the resulting
>>>>>>>>>>> flowfile,
>>>>>>>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>>> skip
>>>>>>>>>>>>>        over the first bit) and spitting the result of this second
>>>>>>>> part
>>>>>>>>> out
>>>>>>>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>>> input
>>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>>> that
>>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>>> practice, I
>>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>>
>


Re: Reading the incoming flowfile "twice"

Posted by Mark Payne <ma...@hotmail.com>.
Sorry, typo. ‘modified’ should have been ‘output’.

> On Mar 31, 2020, at 2:44 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
> 
> (Oh, I see where *out*comes from, but not *modified*.)
> 
> On 3/31/20 12:35 PM, Russell Bateman wrote:
>> Wait, where is *modified*from?
>> 
>> Thanks
>> 
>> On 3/31/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>> 
>>> OK, so then I think the pattern you’d want to follow would be something like this:
>>> 
>>> FlowFile original = session.get();
>>> if (flowFile == null) {
>>>     return;
>>> }
>>> 
>>> FlowFile output = session.create(original);
>>> 
>>> // Begin writing to ‘output flowfile'
>>> output = session.write(*modified*, new OutputStreamCallback() {
>>>     void process(OutputStream*out*) {
>>> 
>>>         // read from original FlowFile
>>>         session.read(original, new InputStreamCallback() {
>>>           void process(InputStream in) {
>>>                copyFirstHalf(in, out);
>>>           }
>>>        });
>>> 
>>> 
>>>         // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
>>>        session.read(original, new InputStreamCallback() {
>>>              void process(InputStream in) {
>>>                   processWithSaxParser(in,*out*);
>>>              }
>>>        });
>>> 
>>>     }
>>> });
>>> 
>>> session.transfer(output, REL_SUCCESS);
>>> session.remove(original);
>>> 
>>> 
>>> Thanks
>>> -Mark
>>> 
>>> 
>>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<ru...@windofkeltia.com>  wrote:
>>>> 
>>>> Mark,
>>>> 
>>>> Thanks for getting back. My steps are:
>>>> 
>>>> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
>>>> 
>>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
>>>> 
>>>> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
>>>> 
>>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
>>>> 
>>>> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
>>>> 
>>>> Thanks,
>>>> Russ
>>>> 
>>>> 
>>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>>> Russ,
>>>>> 
>>>>> As far as I can tell, this is working exactly as expected.
>>>>> 
>>>>> To verify, I created a simple Integration test, as well, which I attached below.
>>>>> 
>>>>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>>>>> 
>>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>> 
>>>>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>>>>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>>>>> 
>>>>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>>>>> 
>>>>> FlowFile original = session.get();
>>>>> If (original == null) {
>>>>>   return;
>>>>> }
>>>>> 
>>>>> session.read(original, new InputStreamCallback() {…});
>>>>> 
>>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>> 
>>>>> // Read the original FlowFile’s content
>>>>> session.read(original, new InputStreamCallback() { … });
>>>>> 
>>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>>>>> 
>>>>> 
>>>>> 
>>>>> Hope this helps!
>>>>> -Mark
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>> wrote:
>>>>>> 
>>>>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>>>>> 
>>>>>> Much thanks,
>>>>>> Russ
>>>>>> 
>>>>>> This is the input stream first time around (before copying) ===================================
>>>>>> * * * session.read( flowfile );
>>>>>>       Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> **<metadata>**
>>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>>> **</metadata>**
>>>>>> **<demographics>**
>>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>>> **<age>36</age>**
>>>>>> **</demographics>**
>>>>>> **</cxml>*
>>>>>> 
>>>>>> And now, let's copy some of the input stream to the output stream =============================
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>       Copying input stream to output stream up to </document>...
>>>>>>       The output stream has in it at this point:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> *
>>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>> 
>>>>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>       Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>*
>>>>>> 
>>>>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>>>>       not copy to the output stream. We expected to see the entire, original file, but the
>>>>>>       second half is gone.]
>>>>>> 
>>>>>>       Here's what's in the output stream at this point:
>>>>>> * (nothing)*
>>>>>> 
>>>>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>>>>     because we looked at it (printed it out here)?]
>>>>>> 
>>>>>> 
>>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>>> Russell
>>>>>>> 
>>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>>> Then add the xml bits.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> 
>>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>   <ma...@gmail.com>   wrote:
>>>>>>> 
>>>>>>>> If these files are only a few MB at the most, you can also just export them
>>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>> 
>>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>   <ma...@windofkeltia.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Joe and Mike,
>>>>>>>>> 
>>>>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>>> output to make it easier to read.
>>>>>>>>> 
>>>>>>>>> ? <#>
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>>> |{|
>>>>>>>>> |||@Override|
>>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>>> |||{|
>>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>>> outputStream );|
>>>>>>>>> |||}|
>>>>>>>>> |} );|
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>>> (after copying)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>>>>> to|
>>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Output of above:
>>>>>>>>> 
>>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>>> stream)...
>>>>>>>>> <cxml>
>>>>>>>>>    <document>
>>>>>>>>>      This is the original document.
>>>>>>>>>    </document>
>>>>>>>>>    <metadata>
>>>>>>>>>      <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>>    </metadata>
>>>>>>>>>    <demographics>
>>>>>>>>>      <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>>      <age>36</age>
>>>>>>>>>    </demographics>
>>>>>>>>> </cxml>
>>>>>>>>> 
>>>>>>>>> And now, let's copy...
>>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>>> <cxml>
>>>>>>>>>    <document>
>>>>>>>>>      This is the original document.
>>>>>>>>>    </document>
>>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>>>>> tests that verify the good functioning of
>>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>> 
>>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>>> Russ
>>>>>>>>> 
>>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com   <ma...@windofkeltia.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Mike,
>>>>>>>>>>> 
>>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>>> is
>>>>>>>>>>> something like this?
>>>>>>>>>>> 
>>>>>>>>>>>      public void onTrigger( final ProcessContext context, final
>>>>>>>>>>>      ProcessSession session ) throws ProcessException
>>>>>>>>>>>      {
>>>>>>>>>>>         FlowFile flowfile = session.get();
>>>>>>>>>>>         ...
>>>>>>>>>>> 
>>>>>>>>>>>         // this is will be our resulting flowfile...
>>>>>>>>>>>         AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>>      AtomicReference<>();
>>>>>>>>>>> 
>>>>>>>>>>>         /* Do some processing on the in-coming flowfile then close its
>>>>>>>>>>>      input stream, but
>>>>>>>>>>>          * save the output stream for continued use.
>>>>>>>>>>>          */
>>>>>>>>>>>      *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>>         {
>>>>>>>>>>>           @Override
>>>>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>>>>           {
>>>>>>>>>>>             savedOutputStream.set( outputStream );
>>>>>>>>>>>             ...
>>>>>>>>>>> 
>>>>>>>>>>>             // processing puts some output on the output stream...
>>>>>>>>>>>             outputStream.write( etc. );
>>>>>>>>>>> 
>>>>>>>>>>>             inputStream.close();
>>>>>>>>>>>           }
>>>>>>>>>>>      *  } );*
>>>>>>>>>>> 
>>>>>>>>>>>         /* Start over doing different processing on the
>>>>>>>> (same/reopened)
>>>>>>>>>>>      in-coming flowfile
>>>>>>>>>>>          * continuing to use the original output stream. It's our
>>>>>>>>>>>      responsibility to close
>>>>>>>>>>>          * the saved output stream, NiFi closes the unused output
>>>>>>>> stream
>>>>>>>>>>>      opened, but
>>>>>>>>>>>          * ignored by us.
>>>>>>>>>>>          */
>>>>>>>>>>>      *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>>>         {
>>>>>>>>>>>           @Override
>>>>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>>>>           {
>>>>>>>>>>>             outputStream = savedOutputStream.get(); // (discard the
>>>>>>>> new
>>>>>>>>>>>      output stream)
>>>>>>>>>>>             ...
>>>>>>>>>>> 
>>>>>>>>>>>             // processing puts (some more) output on the original
>>>>>>>> output
>>>>>>>>>>>      stream...
>>>>>>>>>>>             outputStream.write( etc. );
>>>>>>>>>>> 
>>>>>>>>>>>             outputStream.close();
>>>>>>>>>>>           }
>>>>>>>>>>>      *  } );*
>>>>>>>>>>> 
>>>>>>>>>>>         session.transfer( flowfile, etc. );
>>>>>>>>>>>      }
>>>>>>>>>>> 
>>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>> 
>>>>>>>>>>> Russ
>>>>>>>>>>> 
>>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>>> russ@windofkeltia.com   <ma...@windofkeltia.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>>> incoming
>>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>>> essence
>>>>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>>> output
>>>>>>>>> of
>>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>    1. process the incoming flowfile for the early content not using
>>>>>>>>> SAX,
>>>>>>>>>>>>>       but merely copying as-is; at all cost I must avoid
>>>>>>>>> "reassembling"
>>>>>>>>>>>>>       the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>>    2. output the first part down the output stream to the resulting
>>>>>>>>>>> flowfile,
>>>>>>>>>>>>>    3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>>> skip
>>>>>>>>>>>>>       over the first bit) and spitting the result of this second
>>>>>>>> part
>>>>>>>>> out
>>>>>>>>>>>>>       down the output stream of the resulting flowfile.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>>> input
>>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>>> that
>>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>>> practice, I
>>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>> 
> 


Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
(Oh, I see where *out*comes from, but not *modified*.)

On 3/31/20 12:35 PM, Russell Bateman wrote:
> Wait, where is *modified*from?
>
> Thanks
>
> On 3/31/20 12:24 PM, Mark Payne wrote:
>> Russ,
>>
>> OK, so then I think the pattern you’d want to follow would be something like this:
>>
>> FlowFile original = session.get();
>> if (flowFile == null) {
>>      return;
>> }
>>
>> FlowFile output = session.create(original);
>>
>> // Begin writing to ‘output flowfile'
>> output = session.write(*modified*, new OutputStreamCallback() {
>>      void process(OutputStream*out*) {
>>
>>          // read from original FlowFile
>>          session.read(original, new InputStreamCallback() {
>>            void process(InputStream in) {
>>                 copyFirstHalf(in, out);
>>            }
>>         });
>>
>>
>>          // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
>>         session.read(original, new InputStreamCallback() {
>>               void process(InputStream in) {
>>                    processWithSaxParser(in,*out*);
>>               }
>>         });
>>
>>      }
>> });
>>
>> session.transfer(output, REL_SUCCESS);
>> session.remove(original);
>>
>>
>> Thanks
>> -Mark
>>
>>
>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<ru...@windofkeltia.com>  wrote:
>>>
>>> Mark,
>>>
>>> Thanks for getting back. My steps are:
>>>
>>> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
>>>
>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
>>>
>>> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
>>>
>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
>>>
>>> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
>>>
>>> Thanks,
>>> Russ
>>>
>>>
>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>> Russ,
>>>>
>>>> As far as I can tell, this is working exactly as expected.
>>>>
>>>> To verify, I created a simple Integration test, as well, which I attached below.
>>>>
>>>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>>>>
>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>
>>>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>>>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>>>>
>>>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>>>>
>>>> FlowFile original = session.get();
>>>> If (original == null) {
>>>>    return;
>>>> }
>>>>
>>>> session.read(original, new InputStreamCallback() {…});
>>>>
>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>
>>>> // Read the original FlowFile’s content
>>>> session.read(original, new InputStreamCallback() { … });
>>>>
>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>>>>
>>>>
>>>>
>>>> Hope this helps!
>>>> -Mark
>>>>
>>>>
>>>>
>>>>
>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>> wrote:
>>>>>
>>>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>>>>
>>>>> Much thanks,
>>>>> Russ
>>>>>
>>>>> This is the input stream first time around (before copying) ===================================
>>>>> * * * session.read( flowfile );
>>>>>        Here's what's in input stream:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>**
>>>>> **<metadata>**
>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>> **</metadata>**
>>>>> **<demographics>**
>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>> **<age>36</age>**
>>>>> **</demographics>**
>>>>> **</cxml>*
>>>>>
>>>>> And now, let's copy some of the input stream to the output stream =============================
>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>        Copying input stream to output stream up to </document>...
>>>>>        The output stream has in it at this point:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>**
>>>>> *
>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>
>>>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>        Here's what's in input stream:
>>>>> *<cxml>**
>>>>> **<document>**
>>>>> **    This is the original document.**
>>>>> **</document>*
>>>>>
>>>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>>>        not copy to the output stream. We expected to see the entire, original file, but the
>>>>>        second half is gone.]
>>>>>
>>>>>        Here's what's in the output stream at this point:
>>>>> * (nothing)*
>>>>>
>>>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>>>      because we looked at it (printed it out here)?]
>>>>>
>>>>>
>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>> Russell
>>>>>>
>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>> Then add the xml bits.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>   <ma...@gmail.com>   wrote:
>>>>>>
>>>>>>> If these files are only a few MB at the most, you can also just export them
>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>
>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>   <ma...@windofkeltia.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Joe and Mike,
>>>>>>>>
>>>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>> output to make it easier to read.
>>>>>>>>
>>>>>>>> ? <#>
>>>>>>>> |try|
>>>>>>>> |{|
>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>> |||inputStream.close();|
>>>>>>>> |}|
>>>>>>>> |catch||( IOException e )|
>>>>>>>> |{|
>>>>>>>> |||e.printStackTrace();|
>>>>>>>> |}|
>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>> |{|
>>>>>>>> |||@Override|
>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>> |||{|
>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>> outputStream );|
>>>>>>>> |||}|
>>>>>>>> |} );|
>>>>>>>> |try|
>>>>>>>> |{|
>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>> (after copying)..."| |);|
>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>> |||inputStream.close();|
>>>>>>>> |}|
>>>>>>>> |catch||( IOException e )|
>>>>>>>> |{|
>>>>>>>> |||e.printStackTrace();|
>>>>>>>> |}|
>>>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>>>> to|
>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>
>>>>>>>>
>>>>>>>> Output of above:
>>>>>>>>
>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>> stream)...
>>>>>>>> <cxml>
>>>>>>>>     <document>
>>>>>>>>       This is the original document.
>>>>>>>>     </document>
>>>>>>>>     <metadata>
>>>>>>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>     </metadata>
>>>>>>>>     <demographics>
>>>>>>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>       <age>36</age>
>>>>>>>>     </demographics>
>>>>>>>> </cxml>
>>>>>>>>
>>>>>>>> And now, let's copy...
>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>> <cxml>
>>>>>>>>     <document>
>>>>>>>>       This is the original document.
>>>>>>>>     </document>
>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>
>>>>>>>>
>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>>>> tests that verify the good functioning of
>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>
>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>> Russ
>>>>>>>>
>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>
>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com   <ma...@windofkeltia.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Mike,
>>>>>>>>>>
>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>> is
>>>>>>>>>> something like this?
>>>>>>>>>>
>>>>>>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>>>>>>       ProcessSession session ) throws ProcessException
>>>>>>>>>>       {
>>>>>>>>>>          FlowFile flowfile = session.get();
>>>>>>>>>>          ...
>>>>>>>>>>
>>>>>>>>>>          // this is will be our resulting flowfile...
>>>>>>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>       AtomicReference<>();
>>>>>>>>>>
>>>>>>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>>>>>>       input stream, but
>>>>>>>>>>           * save the output stream for continued use.
>>>>>>>>>>           */
>>>>>>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>          {
>>>>>>>>>>            @Override
>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>            {
>>>>>>>>>>              savedOutputStream.set( outputStream );
>>>>>>>>>>              ...
>>>>>>>>>>
>>>>>>>>>>              // processing puts some output on the output stream...
>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>
>>>>>>>>>>              inputStream.close();
>>>>>>>>>>            }
>>>>>>>>>>       *  } );*
>>>>>>>>>>
>>>>>>>>>>          /* Start over doing different processing on the
>>>>>>> (same/reopened)
>>>>>>>>>>       in-coming flowfile
>>>>>>>>>>           * continuing to use the original output stream. It's our
>>>>>>>>>>       responsibility to close
>>>>>>>>>>           * the saved output stream, NiFi closes the unused output
>>>>>>> stream
>>>>>>>>>>       opened, but
>>>>>>>>>>           * ignored by us.
>>>>>>>>>>           */
>>>>>>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>>          {
>>>>>>>>>>            @Override
>>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>>            {
>>>>>>>>>>              outputStream = savedOutputStream.get(); // (discard the
>>>>>>> new
>>>>>>>>>>       output stream)
>>>>>>>>>>              ...
>>>>>>>>>>
>>>>>>>>>>              // processing puts (some more) output on the original
>>>>>>> output
>>>>>>>>>>       stream...
>>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>>
>>>>>>>>>>              outputStream.close();
>>>>>>>>>>            }
>>>>>>>>>>       *  } );*
>>>>>>>>>>
>>>>>>>>>>          session.transfer( flowfile, etc. );
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>
>>>>>>>>>> Russ
>>>>>>>>>>
>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>> able
>>>>>>>>>> to
>>>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>> russ@windofkeltia.com   <ma...@windofkeltia.com>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>> incoming
>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>> essence
>>>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>> output
>>>>>>>> of
>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>
>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>
>>>>>>>>>>>>     1. process the incoming flowfile for the early content not using
>>>>>>>> SAX,
>>>>>>>>>>>>        but merely copying as-is; at all cost I must avoid
>>>>>>>> "reassembling"
>>>>>>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>     2. output the first part down the output stream to the resulting
>>>>>>>>>> flowfile,
>>>>>>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>> skip
>>>>>>>>>>>>        over the first bit) and spitting the result of this second
>>>>>>> part
>>>>>>>> out
>>>>>>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>>>>>>
>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>> input
>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>> that
>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>> practice, I
>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>


Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Wait, where is *modified*from?

Thanks

On 3/31/20 12:24 PM, Mark Payne wrote:
> Russ,
>
> OK, so then I think the pattern you’d want to follow would be something like this:
>
> FlowFile original = session.get();
> if (flowFile == null) {
>      return;
> }
>
> FlowFile output = session.create(original);
>
> // Begin writing to ‘output flowfile'
> output = session.write(*modified*, new OutputStreamCallback() {
>      void process(OutputStream out) {
>
>          // read from original FlowFile
>          session.read(original, new InputStreamCallback() {
>            void process(InputStream in) {
>                 copyFirstHalf(in, out);
>            }
>         });
>
>
>          // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
>         session.read(original, new InputStreamCallback() {
>               void process(InputStream in) {
>                    processWithSaxParser(in, out);
>               }
>         });
>
>      }
> });
>
> session.transfer(output, REL_SUCCESS);
> session.remove(original);
>
>
> Thanks
> -Mark
>
>
>> On Mar 31, 2020, at 2:04 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>
>> Mark,
>>
>> Thanks for getting back. My steps are:
>>
>> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
>>
>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
>>
>> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
>>
>> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
>>
>> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
>>
>> Thanks,
>> Russ
>>
>>
>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>> Russ,
>>>
>>> As far as I can tell, this is working exactly as expected.
>>>
>>> To verify, I created a simple Integration test, as well, which I attached below.
>>>
>>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>>>
>>> 1. Read the content of the FlowFile. (Via session.read)
>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>
>>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>>>
>>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>>>
>>> FlowFile original = session.get();
>>> If (original == null) {
>>>    return;
>>> }
>>>
>>> session.read(original, new InputStreamCallback() {…});
>>>
>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>>> session.write(childFlowFile, new StreamCallback() {…});
>>>
>>> // Read the original FlowFile’s content
>>> session.read(original, new InputStreamCallback() { … });
>>>
>>> session.transfer(childFlowFile, REL_SUCCESS);
>>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>>>
>>>
>>>
>>> Hope this helps!
>>> -Mark
>>>
>>>
>>>
>>>
>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com <ma...@windofkeltia.com>> wrote:
>>>>
>>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>>>
>>>> Much thanks,
>>>> Russ
>>>>
>>>> This is the input stream first time around (before copying) ===================================
>>>> * * * session.read( flowfile );
>>>>        Here's what's in input stream:
>>>> *<cxml>**
>>>> **<document>**
>>>> **    This is the original document.**
>>>> **</document>**
>>>> **<metadata>**
>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>> **</metadata>**
>>>> **<demographics>**
>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>> **<age>36</age>**
>>>> **</demographics>**
>>>> **</cxml>*
>>>>
>>>> And now, let's copy some of the input stream to the output stream =============================
>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>        Copying input stream to output stream up to </document>...
>>>>        The output stream has in it at this point:
>>>> *<cxml>**
>>>> **<document>**
>>>> **    This is the original document.**
>>>> **</document>**
>>>> *
>>>> [1. When we examine the output stream, it has what we expect.]
>>>>
>>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>        Here's what's in input stream:
>>>> *<cxml>**
>>>> **<document>**
>>>> **    This is the original document.**
>>>> **</document>*
>>>>
>>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>>        not copy to the output stream. We expected to see the entire, original file, but the
>>>>        second half is gone.]
>>>>
>>>>        Here's what's in the output stream at this point:
>>>> * (nothing)*
>>>>
>>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>>      because we looked at it (printed it out here)?]
>>>>
>>>>
>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>> Russell
>>>>>
>>>>> I recommend writing very simple code that does two successive read/write
>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>> Then add the xml bits.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>  <ma...@gmail.com>  wrote:
>>>>>
>>>>>> If these files are only a few MB at the most, you can also just export them
>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>
>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>  <ma...@windofkeltia.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Joe and Mike,
>>>>>>>
>>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>> output to make it easier to read.
>>>>>>>
>>>>>>> ? <#>
>>>>>>> |try|
>>>>>>> |{|
>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>> (before copying to output stream)..."| |);|
>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>> |||inputStream.close();|
>>>>>>> |}|
>>>>>>> |catch||( IOException e )|
>>>>>>> |{|
>>>>>>> |||e.printStackTrace();|
>>>>>>> |}|
>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>> |{|
>>>>>>> |||@Override|
>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>> |||{|
>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>> outputStream );|
>>>>>>> |||}|
>>>>>>> |} );|
>>>>>>> |try|
>>>>>>> |{|
>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>> (after copying)..."| |);|
>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>> |||inputStream.close();|
>>>>>>> |}|
>>>>>>> |catch||( IOException e )|
>>>>>>> |{|
>>>>>>> |||e.printStackTrace();|
>>>>>>> |}|
>>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>>> to|
>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>
>>>>>>>
>>>>>>> Output of above:
>>>>>>>
>>>>>>> This is the input stream first time around (before copying to output
>>>>>>> stream)...
>>>>>>> <cxml>
>>>>>>>     <document>
>>>>>>>       This is the original document.
>>>>>>>     </document>
>>>>>>>     <metadata>
>>>>>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>     </metadata>
>>>>>>>     <demographics>
>>>>>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>       <age>36</age>
>>>>>>>     </demographics>
>>>>>>> </cxml>
>>>>>>>
>>>>>>> And now, let's copy...
>>>>>>> This is the input stream second time around (after copying)...
>>>>>>> <cxml>
>>>>>>>     <document>
>>>>>>>       This is the original document.
>>>>>>>     </document>
>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>> document structures must start and end within the same entity.
>>>>>>>
>>>>>>>
>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>>> tests that verify the good functioning of
>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>
>>>>>>> Thanks for looking at this again if you can,
>>>>>>> Russ
>>>>>>>
>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>
>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Mike,
>>>>>>>>>
>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>> is
>>>>>>>>> something like this?
>>>>>>>>>
>>>>>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>>>>>       ProcessSession session ) throws ProcessException
>>>>>>>>>       {
>>>>>>>>>          FlowFile flowfile = session.get();
>>>>>>>>>          ...
>>>>>>>>>
>>>>>>>>>          // this is will be our resulting flowfile...
>>>>>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>       AtomicReference<>();
>>>>>>>>>
>>>>>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>>>>>       input stream, but
>>>>>>>>>           * save the output stream for continued use.
>>>>>>>>>           */
>>>>>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>          {
>>>>>>>>>            @Override
>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>            {
>>>>>>>>>              savedOutputStream.set( outputStream );
>>>>>>>>>              ...
>>>>>>>>>
>>>>>>>>>              // processing puts some output on the output stream...
>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>
>>>>>>>>>              inputStream.close();
>>>>>>>>>            }
>>>>>>>>>       *  } );*
>>>>>>>>>
>>>>>>>>>          /* Start over doing different processing on the
>>>>>> (same/reopened)
>>>>>>>>>       in-coming flowfile
>>>>>>>>>           * continuing to use the original output stream. It's our
>>>>>>>>>       responsibility to close
>>>>>>>>>           * the saved output stream, NiFi closes the unused output
>>>>>> stream
>>>>>>>>>       opened, but
>>>>>>>>>           * ignored by us.
>>>>>>>>>           */
>>>>>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>          {
>>>>>>>>>            @Override
>>>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>       outputStream ) throws IOException*
>>>>>>>>>            {
>>>>>>>>>              outputStream = savedOutputStream.get(); // (discard the
>>>>>> new
>>>>>>>>>       output stream)
>>>>>>>>>              ...
>>>>>>>>>
>>>>>>>>>              // processing puts (some more) output on the original
>>>>>> output
>>>>>>>>>       stream...
>>>>>>>>>              outputStream.write( etc. );
>>>>>>>>>
>>>>>>>>>              outputStream.close();
>>>>>>>>>            }
>>>>>>>>>       *  } );*
>>>>>>>>>
>>>>>>>>>          session.transfer( flowfile, etc. );
>>>>>>>>>       }
>>>>>>>>>
>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>> which was probably closed when the first call to
>>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>
>>>>>>>>> Russ
>>>>>>>>>
>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>> able
>>>>>>>>> to
>>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>> russ@windofkeltia.com  <ma...@windofkeltia.com>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>> incoming
>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>> essence
>>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>> output
>>>>>>> of
>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>
>>>>>>>>>>> So, in short:
>>>>>>>>>>>
>>>>>>>>>>>     1. process the incoming flowfile for the early content not using
>>>>>>> SAX,
>>>>>>>>>>>        but merely copying as-is; at all cost I must avoid
>>>>>>> "reassembling"
>>>>>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>     2. output the first part down the output stream to the resulting
>>>>>>>>> flowfile,
>>>>>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>>>>>> skip
>>>>>>>>>>>        over the first bit) and spitting the result of this second
>>>>>> part
>>>>>>> out
>>>>>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>>>>>
>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>> input
>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>> that
>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>> practice, I
>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>


Re: Reading the incoming flowfile "twice"

Posted by Mark Payne <ma...@hotmail.com>.
Russ,

OK, so then I think the pattern you’d want to follow would be something like this:

FlowFile original = session.get();
if (flowFile == null) {
    return;
}

FlowFile output = session.create(original);

// Begin writing to ‘output flowfile'
output = session.write(modified, new OutputStreamCallback() {
    void process(OutputStream out) {

        // read from original FlowFile
        session.read(original, new InputStreamCallback() {
          void process(InputStream in) {
               copyFirstHalf(in, out);
          }
       });


        // read from original FlowFile a second time. Use a SAX parser to parse it and write to the end of the ‘output flowfile'
       session.read(original, new InputStreamCallback() {
             void process(InputStream in) {
                  processWithSaxParser(in, out);
             }
       });

    }
});

session.transfer(output, REL_SUCCESS);
session.remove(original);


Thanks
-Mark


> On Mar 31, 2020, at 2:04 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
> 
> Mark,
> 
> Thanks for getting back. My steps are:
> 
> 1. Read the "first half" of the input stream copying it to the output stream. This is because I need to preserve the exact form of it (spacing, indentation, lines, etc.) without change whatsoever. If I
> 
> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which I wrote, will ignore the original part that I'm holding for sacred--everything between <document> and </document>.
> 
> 3. The SAX handler writes the rest of the XML with a few changes out appending it to that same output stream on which the original "half" was written. (This does not seem to work.)
> 
> I was not seeing this as "overwriting" flowfile content, but, in my tiny little mind, I imagined an input stream, which I want to read exactly a) one-half, then again, b) one-whole time, and an output stream to which I start to write by copying (a), followed by a modification of (b) yet, the whole (b) or "second half." Then I'm done. I was thinking of the input stream as from the in-coming flowfile and a separate thing from the output stream which I see as being offered to me for my use in creating a new flowfile to transfer to. I guess this is not how it works.
> 
> My in-coming flowfiles can be megabytes in size. Copying to a string is not an option. Copying to a temporary file "isn't NiFi" as I understand it. I was hoping to avoid writing another processor or two to a) break up the flowfile into <document> ... </document> and (all the rest), fix (all the rest), then stitch the two back together in a later processor. I see having to coordinate the two halves of what used to be one file fraught with precarity and confusion, but I guess that's the solution I'm left with?
> 
> Thanks,
> Russ
> 
> 
> On 3/31/20 10:23 AM, Mark Payne wrote:
>> Russ,
>> 
>> As far as I can tell, this is working exactly as expected.
>> 
>> To verify, I created a simple Integration test, as well, which I attached below.
>> 
>> Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:
>> 
>> 1. Read the content of the FlowFile. (Via session.read)
>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>> 
>> The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
>> If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.
>> 
>> It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:
>> 
>> FlowFile original = session.get();
>> If (original == null) {
>>   return;
>> }
>> 
>> session.read(original, new InputStreamCallback() {…});
>> 
>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
>> session.write(childFlowFile, new StreamCallback() {…});
>> 
>> // Read the original FlowFile’s content
>> session.read(original, new InputStreamCallback() { … });
>> 
>> session.transfer(childFlowFile, REL_SUCCESS);
>> session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.
>> 
>> 
>> 
>> Hope this helps!
>> -Mark
>> 
>> 
>> 
>> 
>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com <ma...@windofkeltia.com>> wrote:
>>> 
>>> If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.
>>> 
>>> Much thanks,
>>> Russ
>>> 
>>> This is the input stream first time around (before copying) ===================================
>>> * * * session.read( flowfile );
>>>       Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>**
>>> **<metadata>**
>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>> **</metadata>**
>>> **<demographics>**
>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>> **<age>36</age>**
>>> **</demographics>**
>>> **</cxml>*
>>> 
>>> And now, let's copy some of the input stream to the output stream =============================
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>       Copying input stream to output stream up to </document>...
>>>       The output stream has in it at this point:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>**
>>> *
>>> [1. When we examine the output stream, it has what we expect.]
>>> 
>>> After copying, can we reopen input stream intact and does outputstream have what we think? ====
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>       Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>*
>>> 
>>> [2. The input stream as reported just above is truncated by exactly the content we did
>>>       not copy to the output stream. We expected to see the entire, original file, but the
>>>       second half is gone.]
>>> 
>>>       Here's what's in the output stream at this point:
>>> * (nothing)*
>>> 
>>> [3. The content we copied to the output stream has disappeared. Does it disappear simply
>>>     because we looked at it (printed it out here)?]
>>> 
>>> 
>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>> Russell
>>>> 
>>>> I recommend writing very simple code that does two successive read/write
>>>> operations on basic data so you can make sure the api work/as expected.
>>>> Then add the xml bits.
>>>> 
>>>> Thanks
>>>> 
>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>  <ma...@gmail.com>  wrote:
>>>> 
>>>>> If these files are only a few MB at the most, you can also just export them
>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>> 
>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>  <ma...@windofkeltia.com>
>>>>> wrote:
>>>>> 
>>>>>> Joe and Mike,
>>>>>> 
>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>> output to make it easier to read.
>>>>>> 
>>>>>> ? <#>
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>> (before copying to output stream)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>> |{|
>>>>>> |||@Override|
>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>> outputStream ) ||throws| |IOException|
>>>>>> |||{|
>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>> outputStream );|
>>>>>> |||}|
>>>>>> |} );|
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>> (after copying)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>> to|
>>>>>> |// exactly what was written out to the output stream|
>>>>>> 
>>>>>> 
>>>>>> Output of above:
>>>>>> 
>>>>>> This is the input stream first time around (before copying to output
>>>>>> stream)...
>>>>>> <cxml>
>>>>>>    <document>
>>>>>>      This is the original document.
>>>>>>    </document>
>>>>>>    <metadata>
>>>>>>      <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>    </metadata>
>>>>>>    <demographics>
>>>>>>      <date_of_birth>1980-07-01</date_of_birth>
>>>>>>      <age>36</age>
>>>>>>    </demographics>
>>>>>> </cxml>
>>>>>> 
>>>>>> And now, let's copy...
>>>>>> This is the input stream second time around (after copying)...
>>>>>> <cxml>
>>>>>>    <document>
>>>>>>      This is the original document.
>>>>>>    </document>
>>>>>> And now, we'll go on to the SAX parser...
>>>>>> <cxml> <document> This is the original document. </document>
>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>> document structures must start and end within the same entity.
>>>>>> 
>>>>>> 
>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>> tests that verify the good functioning of
>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>> 
>>>>>> Thanks for looking at this again if you can,
>>>>>> Russ
>>>>>> 
>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>> 
>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Mike,
>>>>>>>> 
>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>> is
>>>>>>>> something like this?
>>>>>>>> 
>>>>>>>>      public void onTrigger( final ProcessContext context, final
>>>>>>>>      ProcessSession session ) throws ProcessException
>>>>>>>>      {
>>>>>>>>         FlowFile flowfile = session.get();
>>>>>>>>         ...
>>>>>>>> 
>>>>>>>>         // this is will be our resulting flowfile...
>>>>>>>>         AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>      AtomicReference<>();
>>>>>>>> 
>>>>>>>>         /* Do some processing on the in-coming flowfile then close its
>>>>>>>>      input stream, but
>>>>>>>>          * save the output stream for continued use.
>>>>>>>>          */
>>>>>>>>      *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>         {
>>>>>>>>           @Override
>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>           {
>>>>>>>>             savedOutputStream.set( outputStream );
>>>>>>>>             ...
>>>>>>>> 
>>>>>>>>             // processing puts some output on the output stream...
>>>>>>>>             outputStream.write( etc. );
>>>>>>>> 
>>>>>>>>             inputStream.close();
>>>>>>>>           }
>>>>>>>>      *  } );*
>>>>>>>> 
>>>>>>>>         /* Start over doing different processing on the
>>>>> (same/reopened)
>>>>>>>>      in-coming flowfile
>>>>>>>>          * continuing to use the original output stream. It's our
>>>>>>>>      responsibility to close
>>>>>>>>          * the saved output stream, NiFi closes the unused output
>>>>> stream
>>>>>>>>      opened, but
>>>>>>>>          * ignored by us.
>>>>>>>>          */
>>>>>>>>      *  session.write( flowfile, new StreamCallback()*
>>>>>>>>         {
>>>>>>>>           @Override
>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>           {
>>>>>>>>             outputStream = savedOutputStream.get(); // (discard the
>>>>> new
>>>>>>>>      output stream)
>>>>>>>>             ...
>>>>>>>> 
>>>>>>>>             // processing puts (some more) output on the original
>>>>> output
>>>>>>>>      stream...
>>>>>>>>             outputStream.write( etc. );
>>>>>>>> 
>>>>>>>>             outputStream.close();
>>>>>>>>           }
>>>>>>>>      *  } );*
>>>>>>>> 
>>>>>>>>         session.transfer( flowfile, etc. );
>>>>>>>>      }
>>>>>>>> 
>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>> which was probably closed when the first call to
>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>> 
>>>>>>>> Russ
>>>>>>>> 
>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>> able
>>>>>>>> to
>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>> russ@windofkeltia.com  <ma...@windofkeltia.com>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>> incoming
>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>> essence
>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>> output
>>>>>> of
>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>> 
>>>>>>>>>> So, in short:
>>>>>>>>>> 
>>>>>>>>>>    1. process the incoming flowfile for the early content not using
>>>>>> SAX,
>>>>>>>>>>       but merely copying as-is; at all cost I must avoid
>>>>>> "reassembling"
>>>>>>>>>>       the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>    2. output the first part down the output stream to the resulting
>>>>>>>> flowfile,
>>>>>>>>>>    3. (re)process the incoming flowfile using SAX (and I can just
>>>>> skip
>>>>>>>>>>       over the first bit) and spitting the result of this second
>>>>> part
>>>>>> out
>>>>>>>>>>       down the output stream of the resulting flowfile.
>>>>>>>>>> 
>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>> input
>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>> that
>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>> practice, I
>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>> 
>>>>>>>>>> Thanks.
>>>>>>>>>> 
>>>>>>>>>> 
>>> 
>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>> 
> 


Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Mark,

Thanks for getting back. My steps are:

1. Read the "first half" of the input stream copying it to the output 
stream. This is because I need to preserve the exact form of it 
(spacing, indentation, lines, etc.) without change whatsoever. If I

2. Reopen the stream from the beginning with a SAX parser. Its handler, 
which I wrote, will ignore the original part that I'm holding for 
sacred--everything between <document> and </document>.

3. The SAX handler writes the rest of the XML with a few changes out 
appending it to that same output stream on which the original "half" was 
written. (This does not seem to work.)

I was not seeing this as "overwriting" flowfile content, but, in my tiny 
little mind, I imagined an input stream, which I want to read exactly a) 
one-half, then again, b) one-whole time, and an output stream to which I 
start to write by copying (a), followed by a modification of (b) yet, 
the whole (b) or "second half." Then I'm done. I was thinking of the 
input stream as from the in-coming flowfile and a separate thing from 
the output stream which I see as being offered to me for my use in 
creating a new flowfile to transfer to. I guess this is not how it works.

My in-coming flowfiles can be megabytes in size. Copying to a string is 
not an option. Copying to a temporary file "isn't NiFi" as I understand 
it. I was hoping to avoid writing another processor or two to a) break 
up the flowfile into <document> ... </document> and (all the rest), fix 
(all the rest), then stitch the two back together in a later processor. 
I see having to coordinate the two halves of what used to be one file 
fraught with precarity and confusion, but I guess that's the solution 
I'm left with?

Thanks,
Russ


On 3/31/20 10:23 AM, Mark Payne wrote:
> Russ,
>
> As far as I can tell, this is working exactly as expected.
>
> To verify, I created a simple Integration test, as well, which I 
> attached below.
>
> Let me outline what I *think* you’re trying to do here and please 
> correct me if I’m wrong:
>
> 1. Read the content of the FlowFile. (Via session.read)
> 2. Overwrite the content of the FlowFile. (This is done by session.write)
> 3. Overwrite the content of the FlowFile again. (Via session.write)
>
> The third step is the part where I’m confused. You’re calling 
> session.write() again. In the callback, you’ll receive an InputStream 
> that contains the contents of the FlowFile (which have now been 
> modified, per Step 2). You’re also given an OutputStream to write the 
> new content to.
> If you then return without writing anything to the OutputStream, as in 
> the example that you attached, then yes, you’ll have erased all of the 
> FlowFile’s content.
>
> It’s unclear to me exactly what you’re attempting to accomplish in the 
> third step. It *sounds* like you’re expecting the content of the 
> original/incoming FlowFile. But you’re not going to get that because 
> you’ve already overwritten that FlowFile’s content. If that is what 
> you’re trying to do, I think what you’d want to do is something more 
> like this:
>
> FlowFile original = session.get();
> If (original == null) {
>   return;
> }
>
> session.read(original, new InputStreamCallback() {…});
>
> FlowFile childFlowFile = session.create(original); // Create a ‘child’ 
> flow file whose content is equal to the original FlowFile’s content.
> session.write(childFlowFile, new StreamCallback() {…});
>
> // Read the original FlowFile’s content
> session.read(original, new InputStreamCallback() { … });
>
> session.transfer(childFlowFile, REL_SUCCESS);
> session.remove(original); // or transfer to an ‘original’ relationship 
> or whatever makes sense for you.
>
>
>
> Hope this helps!
> -Mark
>
>
>
>
>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <russ@windofkeltia.com 
>> <ma...@windofkeltia.com>> wrote:
>>
>> If I haven't worn out my welcome, here is the simplified code that 
>> should demonstrate either that I have miscoded your suggestions or 
>> that the API doesn't in fact work as advertised. First, the output. 
>> The code, both JUnit test and processor are attached and the files 
>> are pretty small.
>>
>> Much thanks,
>> Russ
>>
>> This is the input stream first time around (before copying) 
>> ===================================
>> * * * session.read( flowfile );
>>       Here's what's in input stream:
>> *<cxml>**
>> **<document>**
>> **    This is the original document.**
>> **</document>**
>> **<metadata>**
>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>> **</metadata>**
>> **<demographics>**
>> **<date_of_birth>1980-07-01</date_of_birth>**
>> **<age>36</age>**
>> **</demographics>**
>> **</cxml>*
>>
>> And now, let's copy some of the input stream to the output stream 
>> =============================
>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>       Copying input stream to output stream up to </document>...
>>       The output stream has in it at this point:
>> *<cxml>**
>> **<document>**
>> **    This is the original document.**
>> **</document>**
>> *
>> [1. When we examine the output stream, it has what we expect.]
>>
>> After copying, can we reopen input stream intact and does 
>> outputstream have what we think? ====
>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>       Here's what's in input stream:
>> *<cxml>**
>> **<document>**
>> **    This is the original document.**
>> **</document>*
>>
>> [2. The input stream as reported just above is truncated by exactly 
>> the content we did
>>       not copy to the output stream. We expected to see the entire, 
>> original file, but the
>>       second half is gone.]
>>
>>       Here's what's in the output stream at this point:
>> * (nothing)*
>>
>> [3. The content we copied to the output stream has disappeared. Does 
>> it disappear simply
>>     because we looked at it (printed it out here)?]
>>
>>
>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>> Russell
>>>
>>> I recommend writing very simple code that does two successive read/write
>>> operations on basic data so you can make sure the api work/as expected.
>>> Then add the xml bits.
>>>
>>> Thanks
>>>
>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mi...@gmail.com>  <ma...@gmail.com>  wrote:
>>>
>>>> If these files are only a few MB at the most, you can also just export them
>>>> to a ByteArrayOutputStream. Just a thought.
>>>>
>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<ru...@windofkeltia.com>  <ma...@windofkeltia.com>
>>>> wrote:
>>>>
>>>>> Joe and Mike,
>>>>>
>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>> to which I copy the first half of the contents of the input stream, I
>>>>> lose what comes after when I try to read again, basically, the second
>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>> output to make it easier to read.
>>>>>
>>>>> ? <#>
>>>>> |try|
>>>>> |{|
>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>> (before copying to output stream)..."| |);|
>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>> |||inputStream.close();|
>>>>> |}|
>>>>> |catch||( IOException e )|
>>>>> |{|
>>>>> |||e.printStackTrace();|
>>>>> |}|
>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>> |{|
>>>>> |||@Override|
>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>> outputStream ) ||throws| |IOException|
>>>>> |||{|
>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>> outputStream );|
>>>>> |||}|
>>>>> |} );|
>>>>> |try|
>>>>> |{|
>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>> (after copying)..."| |);|
>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>> |||inputStream.close();|
>>>>> |}|
>>>>> |catch||( IOException e )|
>>>>> |{|
>>>>> |||e.printStackTrace();|
>>>>> |}|
>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>> to|
>>>>> |// exactly what was written out to the output stream|
>>>>>
>>>>>
>>>>> Output of above:
>>>>>
>>>>> This is the input stream first time around (before copying to output
>>>>> stream)...
>>>>> <cxml>
>>>>>     <document>
>>>>>       This is the original document.
>>>>>     </document>
>>>>>     <metadata>
>>>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>     </metadata>
>>>>>     <demographics>
>>>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>>>       <age>36</age>
>>>>>     </demographics>
>>>>> </cxml>
>>>>>
>>>>> And now, let's copy...
>>>>> This is the input stream second time around (after copying)...
>>>>> <cxml>
>>>>>     <document>
>>>>>       This is the original document.
>>>>>     </document>
>>>>> And now, we'll go on to the SAX parser...
>>>>> <cxml> <document> This is the original document. </document>
>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>> document structures must start and end within the same entity.
>>>>>
>>>>>
>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>> tests that verify the good functioning of
>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>> no second "half". If I comment out copying from input stream to output
>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>
>>>>> Thanks for looking at this again if you can,
>>>>> Russ
>>>>>
>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>> using the resulting flowfile reference into the next call.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com  <ma...@windofkeltia.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Mike,
>>>>>>>
>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>> is
>>>>>>> something like this?
>>>>>>>
>>>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>>>       ProcessSession session ) throws ProcessException
>>>>>>>       {
>>>>>>>          FlowFile flowfile = session.get();
>>>>>>>          ...
>>>>>>>
>>>>>>>          // this is will be our resulting flowfile...
>>>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>       AtomicReference<>();
>>>>>>>
>>>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>>>       input stream, but
>>>>>>>           * save the output stream for continued use.
>>>>>>>           */
>>>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>          {
>>>>>>>            @Override
>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>       outputStream ) throws IOException*
>>>>>>>            {
>>>>>>>              savedOutputStream.set( outputStream );
>>>>>>>              ...
>>>>>>>
>>>>>>>              // processing puts some output on the output stream...
>>>>>>>              outputStream.write( etc. );
>>>>>>>
>>>>>>>              inputStream.close();
>>>>>>>            }
>>>>>>>       *  } );*
>>>>>>>
>>>>>>>          /* Start over doing different processing on the
>>>> (same/reopened)
>>>>>>>       in-coming flowfile
>>>>>>>           * continuing to use the original output stream. It's our
>>>>>>>       responsibility to close
>>>>>>>           * the saved output stream, NiFi closes the unused output
>>>> stream
>>>>>>>       opened, but
>>>>>>>           * ignored by us.
>>>>>>>           */
>>>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>>>          {
>>>>>>>            @Override
>>>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>>>       outputStream ) throws IOException*
>>>>>>>            {
>>>>>>>              outputStream = savedOutputStream.get(); // (discard the
>>>> new
>>>>>>>       output stream)
>>>>>>>              ...
>>>>>>>
>>>>>>>              // processing puts (some more) output on the original
>>>> output
>>>>>>>       stream...
>>>>>>>              outputStream.write( etc. );
>>>>>>>
>>>>>>>              outputStream.close();
>>>>>>>            }
>>>>>>>       *  } );*
>>>>>>>
>>>>>>>          session.transfer( flowfile, etc. );
>>>>>>>       }
>>>>>>>
>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>> which was probably closed when the first call to
>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>
>>>>>>> Russ
>>>>>>>
>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>> able
>>>>>>> to
>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>
>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>> russ@windofkeltia.com  <ma...@windofkeltia.com>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>> incoming
>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>> essence
>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>> output
>>>>> of
>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>
>>>>>>>>> So, in short:
>>>>>>>>>
>>>>>>>>>     1. process the incoming flowfile for the early content not using
>>>>> SAX,
>>>>>>>>>        but merely copying as-is; at all cost I must avoid
>>>>> "reassembling"
>>>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>>>     2. output the first part down the output stream to the resulting
>>>>>>> flowfile,
>>>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>>>> skip
>>>>>>>>>        over the first bit) and spitting the result of this second
>>>> part
>>>>> out
>>>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>>>
>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>> input
>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>> developer question and more a Java question. I have looked at it
>>>> that
>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>> practice, I
>>>>>>>>> would very much like to hear about it.
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>>
>>
>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>


Re: Reading the incoming flowfile "twice"

Posted by Mark Payne <ma...@hotmail.com>.
Russ,

As far as I can tell, this is working exactly as expected.

To verify, I created a simple Integration test, as well, which I attached below.

Let me outline what I *think* you’re trying to do here and please correct me if I’m wrong:

1. Read the content of the FlowFile. (Via session.read)
2. Overwrite the content of the FlowFile. (This is done by session.write)
3. Overwrite the content of the FlowFile again. (Via session.write)

The third step is the part where I’m confused. You’re calling session.write() again. In the callback, you’ll receive an InputStream that contains the contents of the FlowFile (which have now been modified, per Step 2). You’re also given an OutputStream to write the new content to.
If you then return without writing anything to the OutputStream, as in the example that you attached, then yes, you’ll have erased all of the FlowFile’s content.

It’s unclear to me exactly what you’re attempting to accomplish in the third step. It *sounds* like you’re expecting the content of the original/incoming FlowFile. But you’re not going to get that because you’ve already overwritten that FlowFile’s content. If that is what you’re trying to do, I think what you’d want to do is something more like this:

FlowFile original = session.get();
If (original == null) {
  return;
}

session.read(original, new InputStreamCallback() {…});

FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow file whose content is equal to the original FlowFile’s content.
session.write(childFlowFile, new StreamCallback() {…});

// Read the original FlowFile’s content
session.read(original, new InputStreamCallback() { … });

session.transfer(childFlowFile, REL_SUCCESS);
session.remove(original); // or transfer to an ‘original’ relationship or whatever makes sense for you.



Hope this helps!
-Mark




On Mar 30, 2020, at 4:23 PM, Russell Bateman <ru...@windofkeltia.com>> wrote:

If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.

Much thanks,
Russ

This is the input stream first time around (before copying) ===================================
* * * session.read( flowfile );
      Here's what's in input stream:
<cxml>
  <document>
    This is the original document.
  </document>
  <metadata>
    <date_of_service>2016-06-28 13:23</date_of_service>
  </metadata>
  <demographics>
    <date_of_birth>1980-07-01</date_of_birth>
    <age>36</age>
  </demographics>
</cxml>

And now, let's copy some of the input stream to the output stream =============================
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Copying input stream to output stream up to </document>...
      The output stream has in it at this point:
<cxml>
  <document>
    This is the original document.
  </document>

[1. When we examine the output stream, it has what we expect.]

After copying, can we reopen input stream intact and does outputstream have what we think? ====
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Here's what's in input stream:
<cxml>
  <document>
    This is the original document.
  </document>

[2. The input stream as reported just above is truncated by exactly the content we did
      not copy to the output stream. We expected to see the entire, original file, but the
      second half is gone.]

      Here's what's in the output stream at this point:
 (nothing)

[3. The content we copied to the output stream has disappeared. Does it disappear simply
    because we looked at it (printed it out here)?]


On 3/29/20 5:05 AM, Joe Witt wrote:

Russell

I recommend writing very simple code that does two successive read/write
operations on basic data so you can make sure the api work/as expected.
Then add the xml bits.

Thanks

On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com> wrote:



If these files are only a few MB at the most, you can also just export them
to a ByteArrayOutputStream. Just a thought.

On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <ru...@windofkeltia.com>
wrote:



Joe and Mike,

Sadly, I was not able to get very far on this. It seems that the extend
to which I copy the first half of the contents of the input stream, I
lose what comes after when I try to read again, basically, the second
half comprising the <metadata>and <demographics>elements which I was
hoping to SAX-parse. Here's code and output. I have highlighted the
output to make it easier to read.

? <#>
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream first time around
(before copying to output stream)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|flowfile = session.write( flowfile, ||new| |StreamCallback()|
|{|
|||@Override|
|||public| |void| |process( InputStream inputStream, OutputStream
outputStream ) ||throws| |IOException|
|||{|
|||System.out.println( ||"And now, let's copy..."| |);|
|||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
outputStream );|
|||}|
|} );|
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream second time around
(after copying)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|// ...on to SAX parser which dies because the input has been truncated


to|


|// exactly what was written out to the output stream|


Output of above:

This is the input stream first time around (before copying to output
stream)...
<cxml>
   <document>
     This is the original document.
   </document>
   <metadata>
     <date_of_service>2016-06-28 13:23</date_of_service>
   </metadata>
   <demographics>
     <date_of_birth>1980-07-01</date_of_birth>
     <age>36</age>
   </demographics>
</cxml>

And now, let's copy...
This is the input stream second time around (after copying)...
<cxml>
   <document>
     This is the original document.
   </document>
And now, we'll go on to the SAX parser...
<cxml> <document> This is the original document. </document>
[pool-1-thread-1] ERROR [...] SAX ruleparser error:
org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
document structures must start and end within the same entity.


I left off the code that prints, "And now, we'll go on to the SAX
parser..." It's in the next flowfile = session.write( ... ). I have unit
tests that verify the good functioning of
copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
"file" is truncated; SAX finds the first "half" just fine, but there is
no second "half". If I comment out copying from input stream to output
stream, the error doesn't occur--the whole document is there.

Thanks for looking at this again if you can,
Russ

On 3/27/20 3:08 PM, Joe Witt wrote:


you should be able to call write as many times as you need.  just keep
using the resulting flowfile reference into the next call.

On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>


wrote:



Mike,

Many thanks for responding. Do you mean to say that all I have to do


is


something like this?

     public void onTrigger( final ProcessContext context, final
     ProcessSession session ) throws ProcessException
     {
        FlowFile flowfile = session.get();
        ...

        // this is will be our resulting flowfile...
        AtomicReference< OutputStream > savedOutputStream = new
     AtomicReference<>();

        /* Do some processing on the in-coming flowfile then close its
     input stream, but
         * save the output stream for continued use.
         */
     *  session.write( flowfile, new InputStreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            savedOutputStream.set( outputStream );
            ...

            // processing puts some output on the output stream...
            outputStream.write( etc. );

            inputStream.close();
          }
     *  } );*

        /* Start over doing different processing on the


(same/reopened)


     in-coming flowfile
         * continuing to use the original output stream. It's our
     responsibility to close
         * the saved output stream, NiFi closes the unused output


stream


     opened, but
         * ignored by us.
         */
     *  session.write( flowfile, new StreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            outputStream = savedOutputStream.get(); // (discard the


new


     output stream)
            ...

            // processing puts (some more) output on the original


output


     stream...
            outputStream.write( etc. );

            outputStream.close();
          }
     *  } );*

        session.transfer( flowfile, etc. );
     }

I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.

Russ

On 3/27/20 10:03 AM, Mike Thomsen wrote:


session.read(FlowFile) just gives you an InputStream. You should be


able


to


rerun that as many times as you want provided you properly close it.

On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <


russ@windofkeltia.com<ma...@windofkeltia.com>>


wrote:



In my custom processor, I'm using a SAX parser to process an


incoming


flowfile that's in XML. Except that, this particular XML is in


essence


two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the


output


of


the first half, then the SAX-processed second half.

So, in short:

   1. process the incoming flowfile for the early content not using


SAX,


      but merely copying as-is; at all cost I must avoid


"reassembling"


      the first half using my SAX handler (what I'm doing now),
   2. output the first part down the output stream to the resulting


flowfile,


   3. (re)process the incoming flowfile using SAX (and I can just


skip


      over the first bit) and spitting the result of this second


part


out


      down the output stream of the resulting flowfile.

I guess this is tantamount to asking how, in Java, I can read an


input


stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it


that


way too, but, if one of you knows (particularly NiFi) best


practice, I


would very much like to hear about it.

Thanks.







<ReadSplitWrite.java><ReadSplitWriteTest.java>


Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
If I haven't worn out my welcome, here is the simplified code that 
should demonstrate either that I have miscoded your suggestions or that 
the API doesn't in fact work as advertised. First, the output. The code, 
both JUnit test and processor are attached and the files are pretty small.

Much thanks,
Russ

This is the input stream first time around (before copying) 
===================================
* * * session.read( flowfile );
       Here's what's in input stream:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>**
**  <metadata>**
**    <date_of_service>2016-06-28 13:23</date_of_service>**
**  </metadata>**
**  <demographics>**
**<date_of_birth>1980-07-01</date_of_birth>**
**    <age>36</age>**
**  </demographics>**
**</cxml>*

And now, let's copy some of the input stream to the output stream 
=============================
* * * flowfile = session.write( flowfile, new StreamCallback() ...
       Copying input stream to output stream up to </document>...
       The output stream has in it at this point:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>**
*
[1. When we examine the output stream, it has what we expect.]

After copying, can we reopen input stream intact and does outputstream 
have what we think? ====
* * * flowfile = session.write( flowfile, new StreamCallback() ...
       Here's what's in input stream:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>*

[2. The input stream as reported just above is truncated by exactly the 
content we did
       not copy to the output stream. We expected to see the entire, 
original file, but the
       second half is gone.]

       Here's what's in the output stream at this point:
* (nothing)*

[3. The content we copied to the output stream has disappeared. Does it 
disappear simply
     because we looked at it (printed it out here)?]


On 3/29/20 5:05 AM, Joe Witt wrote:
> Russell
>
> I recommend writing very simple code that does two successive read/write
> operations on basic data so you can make sure the api work/as expected.
> Then add the xml bits.
>
> Thanks
>
> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com> wrote:
>
>> If these files are only a few MB at the most, you can also just export them
>> to a ByteArrayOutputStream. Just a thought.
>>
>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <ru...@windofkeltia.com>
>> wrote:
>>
>>> Joe and Mike,
>>>
>>> Sadly, I was not able to get very far on this. It seems that the extend
>>> to which I copy the first half of the contents of the input stream, I
>>> lose what comes after when I try to read again, basically, the second
>>> half comprising the <metadata>and <demographics>elements which I was
>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>> output to make it easier to read.
>>>
>>> ? <#>
>>> |try|
>>> |{|
>>> |||InputStream inputStream = session.read( flowfile );|
>>> |||System.out.println( ||"This is the input stream first time around
>>> (before copying to output stream)..."| |);|
>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>> |||inputStream.close();|
>>> |}|
>>> |catch||( IOException e )|
>>> |{|
>>> |||e.printStackTrace();|
>>> |}|
>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>> |{|
>>> |||@Override|
>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>> outputStream ) ||throws| |IOException|
>>> |||{|
>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>> outputStream );|
>>> |||}|
>>> |} );|
>>> |try|
>>> |{|
>>> |||InputStream inputStream = session.read( flowfile );|
>>> |||System.out.println( ||"This is the input stream second time around
>>> (after copying)..."| |);|
>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>> |||inputStream.close();|
>>> |}|
>>> |catch||( IOException e )|
>>> |{|
>>> |||e.printStackTrace();|
>>> |}|
>>> |// ...on to SAX parser which dies because the input has been truncated
>> to|
>>> |// exactly what was written out to the output stream|
>>>
>>>
>>> Output of above:
>>>
>>> This is the input stream first time around (before copying to output
>>> stream)...
>>> <cxml>
>>>     <document>
>>>       This is the original document.
>>>     </document>
>>>     <metadata>
>>>       <date_of_service>2016-06-28 13:23</date_of_service>
>>>     </metadata>
>>>     <demographics>
>>>       <date_of_birth>1980-07-01</date_of_birth>
>>>       <age>36</age>
>>>     </demographics>
>>> </cxml>
>>>
>>> And now, let's copy...
>>> This is the input stream second time around (after copying)...
>>> <cxml>
>>>     <document>
>>>       This is the original document.
>>>     </document>
>>> And now, we'll go on to the SAX parser...
>>> <cxml> <document> This is the original document. </document>
>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>> document structures must start and end within the same entity.
>>>
>>>
>>> I left off the code that prints, "And now, we'll go on to the SAX
>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>> tests that verify the good functioning of
>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>> no second "half". If I comment out copying from input stream to output
>>> stream, the error doesn't occur--the whole document is there.
>>>
>>> Thanks for looking at this again if you can,
>>> Russ
>>>
>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>> you should be able to call write as many times as you need.  just keep
>>>> using the resulting flowfile reference into the next call.
>>>>
>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
>>>> wrote:
>>>>
>>>>> Mike,
>>>>>
>>>>> Many thanks for responding. Do you mean to say that all I have to do
>> is
>>>>> something like this?
>>>>>
>>>>>       public void onTrigger( final ProcessContext context, final
>>>>>       ProcessSession session ) throws ProcessException
>>>>>       {
>>>>>          FlowFile flowfile = session.get();
>>>>>          ...
>>>>>
>>>>>          // this is will be our resulting flowfile...
>>>>>          AtomicReference< OutputStream > savedOutputStream = new
>>>>>       AtomicReference<>();
>>>>>
>>>>>          /* Do some processing on the in-coming flowfile then close its
>>>>>       input stream, but
>>>>>           * save the output stream for continued use.
>>>>>           */
>>>>>       *  session.write( flowfile, new InputStreamCallback()*
>>>>>          {
>>>>>            @Override
>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>       outputStream ) throws IOException*
>>>>>            {
>>>>>              savedOutputStream.set( outputStream );
>>>>>              ...
>>>>>
>>>>>              // processing puts some output on the output stream...
>>>>>              outputStream.write( etc. );
>>>>>
>>>>>              inputStream.close();
>>>>>            }
>>>>>       *  } );*
>>>>>
>>>>>          /* Start over doing different processing on the
>> (same/reopened)
>>>>>       in-coming flowfile
>>>>>           * continuing to use the original output stream. It's our
>>>>>       responsibility to close
>>>>>           * the saved output stream, NiFi closes the unused output
>> stream
>>>>>       opened, but
>>>>>           * ignored by us.
>>>>>           */
>>>>>       *  session.write( flowfile, new StreamCallback()*
>>>>>          {
>>>>>            @Override
>>>>>       *    public void process( InputStream inputStream, OutputStream
>>>>>       outputStream ) throws IOException*
>>>>>            {
>>>>>              outputStream = savedOutputStream.get(); // (discard the
>> new
>>>>>       output stream)
>>>>>              ...
>>>>>
>>>>>              // processing puts (some more) output on the original
>> output
>>>>>       stream...
>>>>>              outputStream.write( etc. );
>>>>>
>>>>>              outputStream.close();
>>>>>            }
>>>>>       *  } );*
>>>>>
>>>>>          session.transfer( flowfile, etc. );
>>>>>       }
>>>>>
>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>> opened for me (the second time) and replace it with the original one
>>>>> which was probably closed when the first call to
>>>>> session.write()finished. What's on these streams is way too big for me
>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>
>>>>> Russ
>>>>>
>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>> able
>>>>> to
>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>> russ@windofkeltia.com>
>>>>>> wrote:
>>>>>>
>>>>>>> In my custom processor, I'm using a SAX parser to process an
>> incoming
>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>> essence
>>>>>>> two different files and I would like to split, read and process the
>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>> file) not using the SAX parser. At the end, I would stream the
>> output
>>> of
>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>
>>>>>>> So, in short:
>>>>>>>
>>>>>>>     1. process the incoming flowfile for the early content not using
>>> SAX,
>>>>>>>        but merely copying as-is; at all cost I must avoid
>>> "reassembling"
>>>>>>>        the first half using my SAX handler (what I'm doing now),
>>>>>>>     2. output the first part down the output stream to the resulting
>>>>> flowfile,
>>>>>>>     3. (re)process the incoming flowfile using SAX (and I can just
>> skip
>>>>>>>        over the first bit) and spitting the result of this second
>> part
>>> out
>>>>>>>        down the output stream of the resulting flowfile.
>>>>>>>
>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>> input
>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>> developer question and more a Java question. I have looked at it
>> that
>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>> practice, I
>>>>>>> would very much like to hear about it.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>


Re: Reading the incoming flowfile "twice"

Posted by Joe Witt <jo...@gmail.com>.
Russell

I recommend writing very simple code that does two successive read/write
operations on basic data so you can make sure the api work/as expected.
Then add the xml bits.

Thanks

On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mi...@gmail.com> wrote:

> If these files are only a few MB at the most, you can also just export them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <ru...@windofkeltia.com>
> wrote:
>
> > Joe and Mike,
> >
> > Sadly, I was not able to get very far on this. It seems that the extend
> > to which I copy the first half of the contents of the input stream, I
> > lose what comes after when I try to read again, basically, the second
> > half comprising the <metadata>and <demographics>elements which I was
> > hoping to SAX-parse. Here's code and output. I have highlighted the
> > output to make it easier to read.
> >
> > ? <#>
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream first time around
> > (before copying to output stream)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> > |{|
> > |||@Override|
> > |||public| |void| |process( InputStream inputStream, OutputStream
> > outputStream ) ||throws| |IOException|
> > |||{|
> > |||System.out.println( ||"And now, let's copy..."| |);|
> > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> > outputStream );|
> > |||}|
> > |} );|
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream second time around
> > (after copying)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |// ...on to SAX parser which dies because the input has been truncated
> to|
> > |// exactly what was written out to the output stream|
> >
> >
> > Output of above:
> >
> > This is the input stream first time around (before copying to output
> > stream)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> >    <metadata>
> >      <date_of_service>2016-06-28 13:23</date_of_service>
> >    </metadata>
> >    <demographics>
> >      <date_of_birth>1980-07-01</date_of_birth>
> >      <age>36</age>
> >    </demographics>
> > </cxml>
> >
> > And now, let's copy...
> > This is the input stream second time around (after copying)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> > And now, we'll go on to the SAX parser...
> > <cxml> <document> This is the original document. </document>
> > [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> > document structures must start and end within the same entity.
> >
> >
> > I left off the code that prints, "And now, we'll go on to the SAX
> > parser..." It's in the next flowfile = session.write( ... ). I have unit
> > tests that verify the good functioning of
> > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> > "file" is truncated; SAX finds the first "half" just fine, but there is
> > no second "half". If I comment out copying from input stream to output
> > stream, the error doesn't occur--the whole document is there.
> >
> > Thanks for looking at this again if you can,
> > Russ
> >
> > On 3/27/20 3:08 PM, Joe Witt wrote:
> > > you should be able to call write as many times as you need.  just keep
> > > using the resulting flowfile reference into the next call.
> > >
> > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
> >
> > > wrote:
> > >
> > >> Mike,
> > >>
> > >> Many thanks for responding. Do you mean to say that all I have to do
> is
> > >> something like this?
> > >>
> > >>      public void onTrigger( final ProcessContext context, final
> > >>      ProcessSession session ) throws ProcessException
> > >>      {
> > >>         FlowFile flowfile = session.get();
> > >>         ...
> > >>
> > >>         // this is will be our resulting flowfile...
> > >>         AtomicReference< OutputStream > savedOutputStream = new
> > >>      AtomicReference<>();
> > >>
> > >>         /* Do some processing on the in-coming flowfile then close its
> > >>      input stream, but
> > >>          * save the output stream for continued use.
> > >>          */
> > >>      *  session.write( flowfile, new InputStreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             savedOutputStream.set( outputStream );
> > >>             ...
> > >>
> > >>             // processing puts some output on the output stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             inputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         /* Start over doing different processing on the
> (same/reopened)
> > >>      in-coming flowfile
> > >>          * continuing to use the original output stream. It's our
> > >>      responsibility to close
> > >>          * the saved output stream, NiFi closes the unused output
> stream
> > >>      opened, but
> > >>          * ignored by us.
> > >>          */
> > >>      *  session.write( flowfile, new StreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             outputStream = savedOutputStream.get(); // (discard the
> new
> > >>      output stream)
> > >>             ...
> > >>
> > >>             // processing puts (some more) output on the original
> output
> > >>      stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             outputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         session.transfer( flowfile, etc. );
> > >>      }
> > >>
> > >> I'm wondering if this will work to "discard" the new output stream
> > >> opened for me (the second time) and replace it with the original one
> > >> which was probably closed when the first call to
> > >> session.write()finished. What's on these streams is way too big for me
> > >> to put them into temporary memory, say, a ByteArrayOutputStream.
> > >>
> > >> Russ
> > >>
> > >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> > >>> session.read(FlowFile) just gives you an InputStream. You should be
> > able
> > >> to
> > >>> rerun that as many times as you want provided you properly close it.
> > >>>
> > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> > russ@windofkeltia.com>
> > >>> wrote:
> > >>>
> > >>>> In my custom processor, I'm using a SAX parser to process an
> incoming
> > >>>> flowfile that's in XML. Except that, this particular XML is in
> essence
> > >>>> two different files and I would like to split, read and process the
> > >>>> first "half", which starts a couple of lines (XML elements) into the
> > >>>> file) not using the SAX parser. At the end, I would stream the
> output
> > of
> > >>>> the first half, then the SAX-processed second half.
> > >>>>
> > >>>> So, in short:
> > >>>>
> > >>>>    1. process the incoming flowfile for the early content not using
> > SAX,
> > >>>>       but merely copying as-is; at all cost I must avoid
> > "reassembling"
> > >>>>       the first half using my SAX handler (what I'm doing now),
> > >>>>    2. output the first part down the output stream to the resulting
> > >> flowfile,
> > >>>>    3. (re)process the incoming flowfile using SAX (and I can just
> skip
> > >>>>       over the first bit) and spitting the result of this second
> part
> > out
> > >>>>       down the output stream of the resulting flowfile.
> > >>>>
> > >>>> I guess this is tantamount to asking how, in Java, I can read an
> input
> > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> > >>>> developer question and more a Java question. I have looked at it
> that
> > >>>> way too, but, if one of you knows (particularly NiFi) best
> practice, I
> > >>>> would very much like to hear about it.
> > >>>>
> > >>>> Thanks.
> > >>>>
> > >>>>
> > >>
> >
> >
>

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
No, this test file is tiny. The real thing is usually megabytes in size.

On Sun, Mar 29, 2020 at 3:15 AM Mike Thomsen <mi...@gmail.com> wrote:

> If these files are only a few MB at the most, you can also just export them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <ru...@windofkeltia.com>
> wrote:
>
> > Joe and Mike,
> >
> > Sadly, I was not able to get very far on this. It seems that the extend
> > to which I copy the first half of the contents of the input stream, I
> > lose what comes after when I try to read again, basically, the second
> > half comprising the <metadata>and <demographics>elements which I was
> > hoping to SAX-parse. Here's code and output. I have highlighted the
> > output to make it easier to read.
> >
> > ? <#>
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream first time around
> > (before copying to output stream)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> > |{|
> > |||@Override|
> > |||public| |void| |process( InputStream inputStream, OutputStream
> > outputStream ) ||throws| |IOException|
> > |||{|
> > |||System.out.println( ||"And now, let's copy..."| |);|
> > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> > outputStream );|
> > |||}|
> > |} );|
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream second time around
> > (after copying)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |// ...on to SAX parser which dies because the input has been truncated
> to|
> > |// exactly what was written out to the output stream|
> >
> >
> > Output of above:
> >
> > This is the input stream first time around (before copying to output
> > stream)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> >    <metadata>
> >      <date_of_service>2016-06-28 13:23</date_of_service>
> >    </metadata>
> >    <demographics>
> >      <date_of_birth>1980-07-01</date_of_birth>
> >      <age>36</age>
> >    </demographics>
> > </cxml>
> >
> > And now, let's copy...
> > This is the input stream second time around (after copying)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> > And now, we'll go on to the SAX parser...
> > <cxml> <document> This is the original document. </document>
> > [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> > document structures must start and end within the same entity.
> >
> >
> > I left off the code that prints, "And now, we'll go on to the SAX
> > parser..." It's in the next flowfile = session.write( ... ). I have unit
> > tests that verify the good functioning of
> > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> > "file" is truncated; SAX finds the first "half" just fine, but there is
> > no second "half". If I comment out copying from input stream to output
> > stream, the error doesn't occur--the whole document is there.
> >
> > Thanks for looking at this again if you can,
> > Russ
> >
> > On 3/27/20 3:08 PM, Joe Witt wrote:
> > > you should be able to call write as many times as you need.  just keep
> > > using the resulting flowfile reference into the next call.
> > >
> > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
> >
> > > wrote:
> > >
> > >> Mike,
> > >>
> > >> Many thanks for responding. Do you mean to say that all I have to do
> is
> > >> something like this?
> > >>
> > >>      public void onTrigger( final ProcessContext context, final
> > >>      ProcessSession session ) throws ProcessException
> > >>      {
> > >>         FlowFile flowfile = session.get();
> > >>         ...
> > >>
> > >>         // this is will be our resulting flowfile...
> > >>         AtomicReference< OutputStream > savedOutputStream = new
> > >>      AtomicReference<>();
> > >>
> > >>         /* Do some processing on the in-coming flowfile then close its
> > >>      input stream, but
> > >>          * save the output stream for continued use.
> > >>          */
> > >>      *  session.write( flowfile, new InputStreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             savedOutputStream.set( outputStream );
> > >>             ...
> > >>
> > >>             // processing puts some output on the output stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             inputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         /* Start over doing different processing on the
> (same/reopened)
> > >>      in-coming flowfile
> > >>          * continuing to use the original output stream. It's our
> > >>      responsibility to close
> > >>          * the saved output stream, NiFi closes the unused output
> stream
> > >>      opened, but
> > >>          * ignored by us.
> > >>          */
> > >>      *  session.write( flowfile, new StreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             outputStream = savedOutputStream.get(); // (discard the
> new
> > >>      output stream)
> > >>             ...
> > >>
> > >>             // processing puts (some more) output on the original
> output
> > >>      stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             outputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         session.transfer( flowfile, etc. );
> > >>      }
> > >>
> > >> I'm wondering if this will work to "discard" the new output stream
> > >> opened for me (the second time) and replace it with the original one
> > >> which was probably closed when the first call to
> > >> session.write()finished. What's on these streams is way too big for me
> > >> to put them into temporary memory, say, a ByteArrayOutputStream.
> > >>
> > >> Russ
> > >>
> > >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> > >>> session.read(FlowFile) just gives you an InputStream. You should be
> > able
> > >> to
> > >>> rerun that as many times as you want provided you properly close it.
> > >>>
> > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> > russ@windofkeltia.com>
> > >>> wrote:
> > >>>
> > >>>> In my custom processor, I'm using a SAX parser to process an
> incoming
> > >>>> flowfile that's in XML. Except that, this particular XML is in
> essence
> > >>>> two different files and I would like to split, read and process the
> > >>>> first "half", which starts a couple of lines (XML elements) into the
> > >>>> file) not using the SAX parser. At the end, I would stream the
> output
> > of
> > >>>> the first half, then the SAX-processed second half.
> > >>>>
> > >>>> So, in short:
> > >>>>
> > >>>>    1. process the incoming flowfile for the early content not using
> > SAX,
> > >>>>       but merely copying as-is; at all cost I must avoid
> > "reassembling"
> > >>>>       the first half using my SAX handler (what I'm doing now),
> > >>>>    2. output the first part down the output stream to the resulting
> > >> flowfile,
> > >>>>    3. (re)process the incoming flowfile using SAX (and I can just
> skip
> > >>>>       over the first bit) and spitting the result of this second
> part
> > out
> > >>>>       down the output stream of the resulting flowfile.
> > >>>>
> > >>>> I guess this is tantamount to asking how, in Java, I can read an
> input
> > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> > >>>> developer question and more a Java question. I have looked at it
> that
> > >>>> way too, but, if one of you knows (particularly NiFi) best
> practice, I
> > >>>> would very much like to hear about it.
> > >>>>
> > >>>> Thanks.
> > >>>>
> > >>>>
> > >>
> >
> >
>

Re: Reading the incoming flowfile "twice"

Posted by Mike Thomsen <mi...@gmail.com>.
If these files are only a few MB at the most, you can also just export them
to a ByteArrayOutputStream. Just a thought.

On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <ru...@windofkeltia.com>
wrote:

> Joe and Mike,
>
> Sadly, I was not able to get very far on this. It seems that the extend
> to which I copy the first half of the contents of the input stream, I
> lose what comes after when I try to read again, basically, the second
> half comprising the <metadata>and <demographics>elements which I was
> hoping to SAX-parse. Here's code and output. I have highlighted the
> output to make it easier to read.
>
> ? <#>
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream first time around
> (before copying to output stream)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> |{|
> |||@Override|
> |||public| |void| |process( InputStream inputStream, OutputStream
> outputStream ) ||throws| |IOException|
> |||{|
> |||System.out.println( ||"And now, let's copy..."| |);|
> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> outputStream );|
> |||}|
> |} );|
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream second time around
> (after copying)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |// ...on to SAX parser which dies because the input has been truncated to|
> |// exactly what was written out to the output stream|
>
>
> Output of above:
>
> This is the input stream first time around (before copying to output
> stream)...
> <cxml>
>    <document>
>      This is the original document.
>    </document>
>    <metadata>
>      <date_of_service>2016-06-28 13:23</date_of_service>
>    </metadata>
>    <demographics>
>      <date_of_birth>1980-07-01</date_of_birth>
>      <age>36</age>
>    </demographics>
> </cxml>
>
> And now, let's copy...
> This is the input stream second time around (after copying)...
> <cxml>
>    <document>
>      This is the original document.
>    </document>
> And now, we'll go on to the SAX parser...
> <cxml> <document> This is the original document. </document>
> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> document structures must start and end within the same entity.
>
>
> I left off the code that prints, "And now, we'll go on to the SAX
> parser..." It's in the next flowfile = session.write( ... ). I have unit
> tests that verify the good functioning of
> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> "file" is truncated; SAX finds the first "half" just fine, but there is
> no second "half". If I comment out copying from input stream to output
> stream, the error doesn't occur--the whole document is there.
>
> Thanks for looking at this again if you can,
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
> > you should be able to call write as many times as you need.  just keep
> > using the resulting flowfile reference into the next call.
> >
> > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>
> > wrote:
> >
> >> Mike,
> >>
> >> Many thanks for responding. Do you mean to say that all I have to do is
> >> something like this?
> >>
> >>      public void onTrigger( final ProcessContext context, final
> >>      ProcessSession session ) throws ProcessException
> >>      {
> >>         FlowFile flowfile = session.get();
> >>         ...
> >>
> >>         // this is will be our resulting flowfile...
> >>         AtomicReference< OutputStream > savedOutputStream = new
> >>      AtomicReference<>();
> >>
> >>         /* Do some processing on the in-coming flowfile then close its
> >>      input stream, but
> >>          * save the output stream for continued use.
> >>          */
> >>      *  session.write( flowfile, new InputStreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             savedOutputStream.set( outputStream );
> >>             ...
> >>
> >>             // processing puts some output on the output stream...
> >>             outputStream.write( etc. );
> >>
> >>             inputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         /* Start over doing different processing on the (same/reopened)
> >>      in-coming flowfile
> >>          * continuing to use the original output stream. It's our
> >>      responsibility to close
> >>          * the saved output stream, NiFi closes the unused output stream
> >>      opened, but
> >>          * ignored by us.
> >>          */
> >>      *  session.write( flowfile, new StreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             outputStream = savedOutputStream.get(); // (discard the new
> >>      output stream)
> >>             ...
> >>
> >>             // processing puts (some more) output on the original output
> >>      stream...
> >>             outputStream.write( etc. );
> >>
> >>             outputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         session.transfer( flowfile, etc. );
> >>      }
> >>
> >> I'm wondering if this will work to "discard" the new output stream
> >> opened for me (the second time) and replace it with the original one
> >> which was probably closed when the first call to
> >> session.write()finished. What's on these streams is way too big for me
> >> to put them into temporary memory, say, a ByteArrayOutputStream.
> >>
> >> Russ
> >>
> >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> >>> session.read(FlowFile) just gives you an InputStream. You should be
> able
> >> to
> >>> rerun that as many times as you want provided you properly close it.
> >>>
> >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> russ@windofkeltia.com>
> >>> wrote:
> >>>
> >>>> In my custom processor, I'm using a SAX parser to process an incoming
> >>>> flowfile that's in XML. Except that, this particular XML is in essence
> >>>> two different files and I would like to split, read and process the
> >>>> first "half", which starts a couple of lines (XML elements) into the
> >>>> file) not using the SAX parser. At the end, I would stream the output
> of
> >>>> the first half, then the SAX-processed second half.
> >>>>
> >>>> So, in short:
> >>>>
> >>>>    1. process the incoming flowfile for the early content not using
> SAX,
> >>>>       but merely copying as-is; at all cost I must avoid
> "reassembling"
> >>>>       the first half using my SAX handler (what I'm doing now),
> >>>>    2. output the first part down the output stream to the resulting
> >> flowfile,
> >>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
> >>>>       over the first bit) and spitting the result of this second part
> out
> >>>>       down the output stream of the resulting flowfile.
> >>>>
> >>>> I guess this is tantamount to asking how, in Java, I can read an input
> >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> >>>> developer question and more a Java question. I have looked at it that
> >>>> way too, but, if one of you knows (particularly NiFi) best practice, I
> >>>> would very much like to hear about it.
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>
>
>

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Joe and Mike,

Sadly, I was not able to get very far on this. It seems that the extend 
to which I copy the first half of the contents of the input stream, I 
lose what comes after when I try to read again, basically, the second 
half comprising the <metadata>and <demographics>elements which I was 
hoping to SAX-parse. Here's code and output. I have highlighted the 
output to make it easier to read.

? <#>
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream first time around 
(before copying to output stream)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|flowfile = session.write( flowfile, ||new| |StreamCallback()|
|{|
|||@Override|
|||public| |void| |process( InputStream inputStream, OutputStream 
outputStream ) ||throws| |IOException|
|||{|
|||System.out.println( ||"And now, let's copy..."| |);|
|||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream, 
outputStream );|
|||}|
|} );|
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream second time around 
(after copying)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|// ...on to SAX parser which dies because the input has been truncated to|
|// exactly what was written out to the output stream|


Output of above:

This is the input stream first time around (before copying to output 
stream)...
<cxml>
   <document>
     This is the original document.
   </document>
   <metadata>
     <date_of_service>2016-06-28 13:23</date_of_service>
   </metadata>
   <demographics>
     <date_of_birth>1980-07-01</date_of_birth>
     <age>36</age>
   </demographics>
</cxml>

And now, let's copy...
This is the input stream second time around (after copying)...
<cxml>
   <document>
     This is the original document.
   </document>
And now, we'll go on to the SAX parser...
<cxml> <document> This is the original document. </document>
[pool-1-thread-1] ERROR [...] SAX ruleparser error: org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML document structures must start and end within the same entity.


I left off the code that prints, "And now, we'll go on to the SAX 
parser..." It's in the next flowfile = session.write( ... ). I have unit 
tests that verify the good functioning of 
copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the 
"file" is truncated; SAX finds the first "half" just fine, but there is 
no second "half". If I comment out copying from input stream to output 
stream, the error doesn't occur--the whole document is there.

Thanks for looking at this again if you can,
Russ

On 3/27/20 3:08 PM, Joe Witt wrote:
> you should be able to call write as many times as you need.  just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>
> wrote:
>
>> Mike,
>>
>> Many thanks for responding. Do you mean to say that all I have to do is
>> something like this?
>>
>>      public void onTrigger( final ProcessContext context, final
>>      ProcessSession session ) throws ProcessException
>>      {
>>         FlowFile flowfile = session.get();
>>         ...
>>
>>         // this is will be our resulting flowfile...
>>         AtomicReference< OutputStream > savedOutputStream = new
>>      AtomicReference<>();
>>
>>         /* Do some processing on the in-coming flowfile then close its
>>      input stream, but
>>          * save the output stream for continued use.
>>          */
>>      *  session.write( flowfile, new InputStreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             savedOutputStream.set( outputStream );
>>             ...
>>
>>             // processing puts some output on the output stream...
>>             outputStream.write( etc. );
>>
>>             inputStream.close();
>>           }
>>      *  } );*
>>
>>         /* Start over doing different processing on the (same/reopened)
>>      in-coming flowfile
>>          * continuing to use the original output stream. It's our
>>      responsibility to close
>>          * the saved output stream, NiFi closes the unused output stream
>>      opened, but
>>          * ignored by us.
>>          */
>>      *  session.write( flowfile, new StreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             outputStream = savedOutputStream.get(); // (discard the new
>>      output stream)
>>             ...
>>
>>             // processing puts (some more) output on the original output
>>      stream...
>>             outputStream.write( etc. );
>>
>>             outputStream.close();
>>           }
>>      *  } );*
>>
>>         session.transfer( flowfile, etc. );
>>      }
>>
>> I'm wondering if this will work to "discard" the new output stream
>> opened for me (the second time) and replace it with the original one
>> which was probably closed when the first call to
>> session.write()finished. What's on these streams is way too big for me
>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>
>> Russ
>>
>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>> session.read(FlowFile) just gives you an InputStream. You should be able
>> to
>>> rerun that as many times as you want provided you properly close it.
>>>
>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <ru...@windofkeltia.com>
>>> wrote:
>>>
>>>> In my custom processor, I'm using a SAX parser to process an incoming
>>>> flowfile that's in XML. Except that, this particular XML is in essence
>>>> two different files and I would like to split, read and process the
>>>> first "half", which starts a couple of lines (XML elements) into the
>>>> file) not using the SAX parser. At the end, I would stream the output of
>>>> the first half, then the SAX-processed second half.
>>>>
>>>> So, in short:
>>>>
>>>>    1. process the incoming flowfile for the early content not using SAX,
>>>>       but merely copying as-is; at all cost I must avoid "reassembling"
>>>>       the first half using my SAX handler (what I'm doing now),
>>>>    2. output the first part down the output stream to the resulting
>> flowfile,
>>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
>>>>       over the first bit) and spitting the result of this second part out
>>>>       down the output stream of the resulting flowfile.
>>>>
>>>> I guess this is tantamount to asking how, in Java, I can read an input
>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>> developer question and more a Java question. I have looked at it that
>>>> way too, but, if one of you knows (particularly NiFi) best practice, I
>>>> would very much like to hear about it.
>>>>
>>>> Thanks.
>>>>
>>>>
>>


Re: Reading the incoming flowfile "twice"

Posted by Mike Thomsen <mi...@gmail.com>.
I prefer to avoid the callbacks and do this:

InputStream is = session.read(flowFile);
.... some code....
is.close();

On Fri, Mar 27, 2020 at 5:22 PM Russell Bateman <ru...@windofkeltia.com>
wrote:

> Joe,
>
> Ah, thanks. I think I have learned a lot about what's going on down
> inside session.read/write()today. I don't have to stand on my head. For
> completeness if anyone else looks for this answer, here's my code amended:
>
> public void onTrigger( final ProcessContext context, final ProcessSession
> session ) throws ProcessException
> {
>    FlowFile flowfile = session.get();
>    ...
>
>    // Do some processing on the in-coming flowfile then close its input
> stream
>    flowfile = session.write( flowfile, new InputStreamCallback()
>    {
>      @Override
>      public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException
>      {
>        ...
>
>        // processing puts some output on the output stream...
>        outputStream.write( etc. );
>
>        inputStream.close();
>      }
>    } );
>
>    // Start over doing different processing on the (same/reopened)
> in-coming flowfile
>    // continuing to use the (same, also reopened, but appended to) output
> stream.
>    flowfile = session.write( flowfile, new StreamCallback()
>    {
>      @Override
>      public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException
>      {
>        ...
>
>        // processing puts (some more) output on the flowfile's output
> stream...
>        outputStream.write( etc. );
>      }
>    } );
>
>    session.transfer( flowfile, etc. );
> }
>
> As I'm fond of saying, NiFi just rocks because there's always a solution!
>
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
> > you should be able to call write as many times as you need.  just keep
> > using the resulting flowfile reference into the next call.
> >
> > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>
> > wrote:
> >
> >> Mike,
> >>
> >> Many thanks for responding. Do you mean to say that all I have to do is
> >> something like this?
> >>
> >>      public void onTrigger( final ProcessContext context, final
> >>      ProcessSession session ) throws ProcessException
> >>      {
> >>         FlowFile flowfile = session.get();
> >>         ...
> >>
> >>         // this is will be our resulting flowfile...
> >>         AtomicReference< OutputStream > savedOutputStream = new
> >>      AtomicReference<>();
> >>
> >>         /* Do some processing on the in-coming flowfile then close its
> >>      input stream, but
> >>          * save the output stream for continued use.
> >>          */
> >>      *  session.write( flowfile, new InputStreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             savedOutputStream.set( outputStream );
> >>             ...
> >>
> >>             // processing puts some output on the output stream...
> >>             outputStream.write( etc. );
> >>
> >>             inputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         /* Start over doing different processing on the (same/reopened)
> >>      in-coming flowfile
> >>          * continuing to use the original output stream. It's our
> >>      responsibility to close
> >>          * the saved output stream, NiFi closes the unused output stream
> >>      opened, but
> >>          * ignored by us.
> >>          */
> >>      *  session.write( flowfile, new StreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             outputStream = savedOutputStream.get(); // (discard the new
> >>      output stream)
> >>             ...
> >>
> >>             // processing puts (some more) output on the original output
> >>      stream...
> >>             outputStream.write( etc. );
> >>
> >>             outputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         session.transfer( flowfile, etc. );
> >>      }
> >>
> >> I'm wondering if this will work to "discard" the new output stream
> >> opened for me (the second time) and replace it with the original one
> >> which was probably closed when the first call to
> >> session.write()finished. What's on these streams is way too big for me
> >> to put them into temporary memory, say, a ByteArrayOutputStream.
> >>
> >> Russ
> >>
> >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> >>> session.read(FlowFile) just gives you an InputStream. You should be
> able
> >> to
> >>> rerun that as many times as you want provided you properly close it.
> >>>
> >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> russ@windofkeltia.com>
> >>> wrote:
> >>>
> >>>> In my custom processor, I'm using a SAX parser to process an incoming
> >>>> flowfile that's in XML. Except that, this particular XML is in essence
> >>>> two different files and I would like to split, read and process the
> >>>> first "half", which starts a couple of lines (XML elements) into the
> >>>> file) not using the SAX parser. At the end, I would stream the output
> of
> >>>> the first half, then the SAX-processed second half.
> >>>>
> >>>> So, in short:
> >>>>
> >>>>    1. process the incoming flowfile for the early content not using
> SAX,
> >>>>       but merely copying as-is; at all cost I must avoid
> "reassembling"
> >>>>       the first half using my SAX handler (what I'm doing now),
> >>>>    2. output the first part down the output stream to the resulting
> >> flowfile,
> >>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
> >>>>       over the first bit) and spitting the result of this second part
> out
> >>>>       down the output stream of the resulting flowfile.
> >>>>
> >>>> I guess this is tantamount to asking how, in Java, I can read an input
> >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> >>>> developer question and more a Java question. I have looked at it that
> >>>> way too, but, if one of you knows (particularly NiFi) best practice, I
> >>>> would very much like to hear about it.
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>
>
>

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Joe,

Ah, thanks. I think I have learned a lot about what's going on down 
inside session.read/write()today. I don't have to stand on my head. For 
completeness if anyone else looks for this answer, here's my code amended:

public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
{
   FlowFile flowfile = session.get();
   ...

   // Do some processing on the in-coming flowfile then close its input stream
   flowfile = session.write( flowfile, new InputStreamCallback()
   {
     @Override
     public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
     {
       ...

       // processing puts some output on the output stream...
       outputStream.write( etc. );

       inputStream.close();
     }
   } );

   // Start over doing different processing on the (same/reopened) in-coming flowfile
   // continuing to use the (same, also reopened, but appended to) output stream.
   flowfile = session.write( flowfile, new StreamCallback()
   {
     @Override
     public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
     {
       ...

       // processing puts (some more) output on the flowfile's output stream...
       outputStream.write( etc. );
     }
   } );

   session.transfer( flowfile, etc. );
}

As I'm fond of saying, NiFi just rocks because there's always a solution!

Russ

On 3/27/20 3:08 PM, Joe Witt wrote:
> you should be able to call write as many times as you need.  just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>
> wrote:
>
>> Mike,
>>
>> Many thanks for responding. Do you mean to say that all I have to do is
>> something like this?
>>
>>      public void onTrigger( final ProcessContext context, final
>>      ProcessSession session ) throws ProcessException
>>      {
>>         FlowFile flowfile = session.get();
>>         ...
>>
>>         // this is will be our resulting flowfile...
>>         AtomicReference< OutputStream > savedOutputStream = new
>>      AtomicReference<>();
>>
>>         /* Do some processing on the in-coming flowfile then close its
>>      input stream, but
>>          * save the output stream for continued use.
>>          */
>>      *  session.write( flowfile, new InputStreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             savedOutputStream.set( outputStream );
>>             ...
>>
>>             // processing puts some output on the output stream...
>>             outputStream.write( etc. );
>>
>>             inputStream.close();
>>           }
>>      *  } );*
>>
>>         /* Start over doing different processing on the (same/reopened)
>>      in-coming flowfile
>>          * continuing to use the original output stream. It's our
>>      responsibility to close
>>          * the saved output stream, NiFi closes the unused output stream
>>      opened, but
>>          * ignored by us.
>>          */
>>      *  session.write( flowfile, new StreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             outputStream = savedOutputStream.get(); // (discard the new
>>      output stream)
>>             ...
>>
>>             // processing puts (some more) output on the original output
>>      stream...
>>             outputStream.write( etc. );
>>
>>             outputStream.close();
>>           }
>>      *  } );*
>>
>>         session.transfer( flowfile, etc. );
>>      }
>>
>> I'm wondering if this will work to "discard" the new output stream
>> opened for me (the second time) and replace it with the original one
>> which was probably closed when the first call to
>> session.write()finished. What's on these streams is way too big for me
>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>
>> Russ
>>
>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>> session.read(FlowFile) just gives you an InputStream. You should be able
>> to
>>> rerun that as many times as you want provided you properly close it.
>>>
>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <ru...@windofkeltia.com>
>>> wrote:
>>>
>>>> In my custom processor, I'm using a SAX parser to process an incoming
>>>> flowfile that's in XML. Except that, this particular XML is in essence
>>>> two different files and I would like to split, read and process the
>>>> first "half", which starts a couple of lines (XML elements) into the
>>>> file) not using the SAX parser. At the end, I would stream the output of
>>>> the first half, then the SAX-processed second half.
>>>>
>>>> So, in short:
>>>>
>>>>    1. process the incoming flowfile for the early content not using SAX,
>>>>       but merely copying as-is; at all cost I must avoid "reassembling"
>>>>       the first half using my SAX handler (what I'm doing now),
>>>>    2. output the first part down the output stream to the resulting
>> flowfile,
>>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
>>>>       over the first bit) and spitting the result of this second part out
>>>>       down the output stream of the resulting flowfile.
>>>>
>>>> I guess this is tantamount to asking how, in Java, I can read an input
>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>> developer question and more a Java question. I have looked at it that
>>>> way too, but, if one of you knows (particularly NiFi) best practice, I
>>>> would very much like to hear about it.
>>>>
>>>> Thanks.
>>>>
>>>>
>>


Re: Reading the incoming flowfile "twice"

Posted by Joe Witt <jo...@gmail.com>.
you should be able to call write as many times as you need.  just keep
using the resulting flowfile reference into the next call.

On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <ru...@windofkeltia.com>
wrote:

> Mike,
>
> Many thanks for responding. Do you mean to say that all I have to do is
> something like this?
>
>     public void onTrigger( final ProcessContext context, final
>     ProcessSession session ) throws ProcessException
>     {
>        FlowFile flowfile = session.get();
>        ...
>
>        // this is will be our resulting flowfile...
>        AtomicReference< OutputStream > savedOutputStream = new
>     AtomicReference<>();
>
>        /* Do some processing on the in-coming flowfile then close its
>     input stream, but
>         * save the output stream for continued use.
>         */
>     *  session.write( flowfile, new InputStreamCallback()*
>        {
>          @Override
>     *    public void process( InputStream inputStream, OutputStream
>     outputStream ) throws IOException*
>          {
>            savedOutputStream.set( outputStream );
>            ...
>
>            // processing puts some output on the output stream...
>            outputStream.write( etc. );
>
>            inputStream.close();
>          }
>     *  } );*
>
>        /* Start over doing different processing on the (same/reopened)
>     in-coming flowfile
>         * continuing to use the original output stream. It's our
>     responsibility to close
>         * the saved output stream, NiFi closes the unused output stream
>     opened, but
>         * ignored by us.
>         */
>     *  session.write( flowfile, new StreamCallback()*
>        {
>          @Override
>     *    public void process( InputStream inputStream, OutputStream
>     outputStream ) throws IOException*
>          {
>            outputStream = savedOutputStream.get(); // (discard the new
>     output stream)
>            ...
>
>            // processing puts (some more) output on the original output
>     stream...
>            outputStream.write( etc. );
>
>            outputStream.close();
>          }
>     *  } );*
>
>        session.transfer( flowfile, etc. );
>     }
>
> I'm wondering if this will work to "discard" the new output stream
> opened for me (the second time) and replace it with the original one
> which was probably closed when the first call to
> session.write()finished. What's on these streams is way too big for me
> to put them into temporary memory, say, a ByteArrayOutputStream.
>
> Russ
>
> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> > session.read(FlowFile) just gives you an InputStream. You should be able
> to
> > rerun that as many times as you want provided you properly close it.
> >
> > On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <ru...@windofkeltia.com>
> > wrote:
> >
> >> In my custom processor, I'm using a SAX parser to process an incoming
> >> flowfile that's in XML. Except that, this particular XML is in essence
> >> two different files and I would like to split, read and process the
> >> first "half", which starts a couple of lines (XML elements) into the
> >> file) not using the SAX parser. At the end, I would stream the output of
> >> the first half, then the SAX-processed second half.
> >>
> >> So, in short:
> >>
> >>   1. process the incoming flowfile for the early content not using SAX,
> >>      but merely copying as-is; at all cost I must avoid "reassembling"
> >>      the first half using my SAX handler (what I'm doing now),
> >>   2. output the first part down the output stream to the resulting
> flowfile,
> >>   3. (re)process the incoming flowfile using SAX (and I can just skip
> >>      over the first bit) and spitting the result of this second part out
> >>      down the output stream of the resulting flowfile.
> >>
> >> I guess this is tantamount to asking how, in Java, I can read an input
> >> stream twice (or one-half plus one times). Maybe it's less a NiFi
> >> developer question and more a Java question. I have looked at it that
> >> way too, but, if one of you knows (particularly NiFi) best practice, I
> >> would very much like to hear about it.
> >>
> >> Thanks.
> >>
> >>
>
>

Re: Reading the incoming flowfile "twice"

Posted by Russell Bateman <ru...@windofkeltia.com>.
Mike,

Many thanks for responding. Do you mean to say that all I have to do is 
something like this?

    public void onTrigger( final ProcessContext context, final
    ProcessSession session ) throws ProcessException
    {
       FlowFile flowfile = session.get();
       ...

       // this is will be our resulting flowfile...
       AtomicReference< OutputStream > savedOutputStream = new
    AtomicReference<>();

       /* Do some processing on the in-coming flowfile then close its
    input stream, but
        * save the output stream for continued use.
        */
    *  session.write( flowfile, new InputStreamCallback()*
       {
         @Override
    *    public void process( InputStream inputStream, OutputStream
    outputStream ) throws IOException*
         {
           savedOutputStream.set( outputStream );
           ...

           // processing puts some output on the output stream...
           outputStream.write( etc. );

           inputStream.close();
         }
    *  } );*

       /* Start over doing different processing on the (same/reopened)
    in-coming flowfile
        * continuing to use the original output stream. It's our
    responsibility to close
        * the saved output stream, NiFi closes the unused output stream
    opened, but
        * ignored by us.
        */
    *  session.write( flowfile, new StreamCallback()*
       {
         @Override
    *    public void process( InputStream inputStream, OutputStream
    outputStream ) throws IOException*
         {
           outputStream = savedOutputStream.get(); // (discard the new
    output stream)
           ...

           // processing puts (some more) output on the original output
    stream...
           outputStream.write( etc. );

           outputStream.close();
         }
    *  } );*

       session.transfer( flowfile, etc. );
    }

I'm wondering if this will work to "discard" the new output stream 
opened for me (the second time) and replace it with the original one 
which was probably closed when the first call to 
session.write()finished. What's on these streams is way too big for me 
to put them into temporary memory, say, a ByteArrayOutputStream.

Russ

On 3/27/20 10:03 AM, Mike Thomsen wrote:
> session.read(FlowFile) just gives you an InputStream. You should be able to
> rerun that as many times as you want provided you properly close it.
>
> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <ru...@windofkeltia.com>
> wrote:
>
>> In my custom processor, I'm using a SAX parser to process an incoming
>> flowfile that's in XML. Except that, this particular XML is in essence
>> two different files and I would like to split, read and process the
>> first "half", which starts a couple of lines (XML elements) into the
>> file) not using the SAX parser. At the end, I would stream the output of
>> the first half, then the SAX-processed second half.
>>
>> So, in short:
>>
>>   1. process the incoming flowfile for the early content not using SAX,
>>      but merely copying as-is; at all cost I must avoid "reassembling"
>>      the first half using my SAX handler (what I'm doing now),
>>   2. output the first part down the output stream to the resulting flowfile,
>>   3. (re)process the incoming flowfile using SAX (and I can just skip
>>      over the first bit) and spitting the result of this second part out
>>      down the output stream of the resulting flowfile.
>>
>> I guess this is tantamount to asking how, in Java, I can read an input
>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>> developer question and more a Java question. I have looked at it that
>> way too, but, if one of you knows (particularly NiFi) best practice, I
>> would very much like to hear about it.
>>
>> Thanks.
>>
>>


Re: Reading the incoming flowfile "twice"

Posted by Mike Thomsen <mi...@gmail.com>.
session.read(FlowFile) just gives you an InputStream. You should be able to
rerun that as many times as you want provided you properly close it.

On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <ru...@windofkeltia.com>
wrote:

> In my custom processor, I'm using a SAX parser to process an incoming
> flowfile that's in XML. Except that, this particular XML is in essence
> two different files and I would like to split, read and process the
> first "half", which starts a couple of lines (XML elements) into the
> file) not using the SAX parser. At the end, I would stream the output of
> the first half, then the SAX-processed second half.
>
> So, in short:
>
>  1. process the incoming flowfile for the early content not using SAX,
>     but merely copying as-is; at all cost I must avoid "reassembling"
>     the first half using my SAX handler (what I'm doing now),
>  2. output the first part down the output stream to the resulting flowfile,
>  3. (re)process the incoming flowfile using SAX (and I can just skip
>     over the first bit) and spitting the result of this second part out
>     down the output stream of the resulting flowfile.
>
> I guess this is tantamount to asking how, in Java, I can read an input
> stream twice (or one-half plus one times). Maybe it's less a NiFi
> developer question and more a Java question. I have looked at it that
> way too, but, if one of you knows (particularly NiFi) best practice, I
> would very much like to hear about it.
>
> Thanks.
>
>