You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Russell Bateman <ru...@windofkeltia.com> on 2020/08/24 22:37:22 UTC

From one flowfile to two...

I am writing a custom processor that, upon processing a flowfile, 
results  in two new flowfiles (neither keeping the exact, original 
content) out two different relationships. I might like to route the 
original flowfile to a separate relationship.

FlowFile original = session.get();

Do I need to call session.create()for the two new files?

 1. session.read()of original file's contents, not all of the way
    through, but send the processed output from what I do read as
    flowfile 1.
 2. session.read()of original file's contents and send resulting output
    as flowfile 2.
 3. session.transfer()of original flowfile.

I look at all of these session.read()and session.write()calls and I'm a 
bit confused as to which to use that won't lose the original flowfile's 
content after #1 so I can start over again in #2.

Thanks.

Re: From one flowfile to two...

Posted by Russell Bateman <ru...@windofkeltia.com>.
I will be sure to do that. I keep several pages of NiFi notes on my 
website (javahotchocolate.com). The notes are mostly for me to 
re-consult, but I have tutorials about writing custom processors and the 
like. I'll be putting out a skeletal copy of my recent code soon.

Thanks!

On 8/27/20 11:15 AM, Andy LoPresto wrote:
> Russell,
>
> Glad you found a working solution. Maybe it would be better for you to write up your findings and share them with a broader audience. I have often seen the best explanations are written by people who were recently in the “how do I do X?” state, as they are closest to the problem and can walk through their process of gathering understanding. Someone who works on these methods day in and day out may not write for the appropriate audience or explain the experience as well.
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> He/Him
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
>> On Aug 27, 2020, at 10:10 AM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>
>> I needed to get back here...
>>
>> I took this advice to heart and finished my processor. Thanks to Matt and Mark for all their suggestions! They cleared up a few things. There was one bug in the code that was mine, small, but significant in its effect on the rest. That mistake also explained why I thought the uuidwas identical between at least two of the cloned flowfiles. What I would wish for, and am probably not strong enough to write, would be a synthesis of the session methods read() and write() and how best to use them (one-to-one, one-to-many, etc.). Javadoc is too paratactic by nature, the NiFi Developer's Guide almost silent on these methods. If it were not for the many existing examples using these methods, it would be hard to learn to do even simple things. I did look for something closer to what I needed to do, but unsuccessfully.
>>
>> Thanks again. If anything, the NiFi mailing lists are a place both for great information and being treated well.
>>
>> Russ
>>
>> On 8/25/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>>
>>> Several comments here. I’ve included them inline, below.
>>>
>>> Hope it’s helpful.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>>>
>>>> Thanks for your suggestions, Matt.
>>>>
>>>> I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)
>>>>
>>>> Now I have three questions...
>>>>
>>>> 1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:
>>>>
>>>> java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]
>>>>> This is probably because at the end of the block, you catch Exception and then route the original FlowFile to failure. But you’ve already cloned it and didn’t deal with the clone.
>>>> *Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *
>>>>
>>>> 2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*
>>>>>> This appears to actually be a bug in the mock framework. It *should* have a unique uuid, and would in a running NiFi instance. Feel free to file a Jira for that.
>>>> 3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?
>>>>> session.clone() is not particularly expensive. It’s just creating a new FlowFile object. It doesn’t clone the FlowFile’s contents.
>>> That said, it is probably more appropriate to call session.create(flowFile), rather than session.clone(flowFile). It makes little difference in practice but what you’re really doing is forking a child, and that will come across more cleanly in the Provenance lineage that is generated if using session.create(flowFile).
>>>
>>> Additional comments in code below.
>>>
>>>
>>>> Thanks,
>>>> Russ
>>>>
>>>>
>>>> @Override
>>>> public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
>>>> {
>>>>    FlowFile flowfile = session.get();
>>>>
>>>>    if( flowfile == null )
>>>>    {
>>>>      context.yield();
>>>>>> No need to yield here. Let the framework handle the scheduling. ProcessContext.yield() is meant for cases where you’re communicating with some external service, for instance, and you know the service is unavailable or rate limiting you or something like that. You can’t make any progress, so tell NiFi to not bother wasting CPU cycles with this Processor.
>>>>      return;
>>>>    }
>>>>
>>>>    try
>>>>    {
>>>>      final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
>>>>
>>>>      FlowFile document = session.clone( flowfile );
>>>>
>>>> *    // excerpt and write the embedded document to a new flowfile...*
>>>>      session.write( document, new OutputStreamCallback()
>>>>      {
>>>>        @Override public void process( OutputStream outputStream )
>>>>        {
>>>>          // read from the original flowfile copying to the output flowfile...
>>>>          session.read( flowfile, new InputStreamCallback()
>>>>          {
>>>>            @Override public void process( InputStream inputStream ) throws IOException
>>>>            {
>>>>             ...
>>>>            }
>>>>          } );
>>>>        }
>>>>      } );
>>>>
>>>>      FlowFile concepts = session.clone( flowfile );
>>>>
>>>>      AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
>>>>
>>>> *    // parse the concepts into a POJO list...*
>>>>      session.read( concepts, new InputStreamCallback()
>>>>      {
>>>>        final ConceptList conceptList = conceptListHolder.get();
>>>>
>>>>        @Override public void process( InputStream inputStream ) throws IOException
>>>>        {
>>>>          ...
>>>>        }
>>>>      } );
>>>>
>>>> *    // write out the concept POJOs serialized...*
>>>>      session.write( concepts, new OutputStreamCallback()
>>>>      {
>>>>        @Override public void process( OutputStream outputStream )
>>>>        {
>>>>          ...
>>>>        }
>>>>      } );
>>>>> At this point, you’ve written to the ‘document’ flowfile once, written to the ‘concepts’ flowfile once and read the original FlowFile twice (well read the original flowfile once and read the clone once, which amounts to the same thing).
>>> You could instead do something like:
>>>
>>> FlowFile document = session.create(flowFile);
>>> FlowFile concepts = session.create(flowFile);
>>>
>>> try (final InputStream input = session.read(flowFile)) {
>>>      try (final OutputStream documentOut = session.write(document);
>>>            final OutputStream conceptOut = session.write(concept)) {
>>>
>>>           // Perform processing.
>>>
>>>      }
>>> }
>>>
>>> In this way, you avoid reading the input FlowFile twice. Of course, you provided an abstraction of the code, so it’s possible that this won’t actually work, depending on what you’re doing to read the input...
>>>
>>>>      document = session.putAttribute( document, "embedded-document", UUID );
>>>>      concepts = session.putAttribute( document, "embedded-document", UUID );
>>>>      session.transfer( document, DOCUMENT );
>>>>      session.transfer( concepts, CONCEPTS );
>>>>      session.remove( flowfile );
>>>>    }
>>>>    catch( Exception e )
>>>>    {
>>>>      session.transfer( flowfile, FAILURE );
>>>>    }
>>>> }
>>>>
>>>> On 8/24/20 4:52 PM, Matt Burgess wrote:
>>>>> Russell,
>>>>>
>>>>> session.read() won't overwrite any contents of the incoming flow file,
>>>>> but write() will. For #2, are you doing any processing on the file? If
>>>>> not, wouldn't that be the original flowfile anyway? Or do you want it
>>>>> to be a different flowfile on purpose (so you can send the incoming
>>>>> flowfile to a different relationship)? You can use session.clone() to
>>>>> create a new flowfile that has the same content and attributes from
>>>>> the incoming flowfile, then handle that separately from the incoming
>>>>> (original) flowfile. For #1, you could clone() the original flowfile
>>>>> and do the read/process/write as part of a session.write(FlowFile,
>>>>> StreamCallback) call, then you're technically reading the "new" file
>>>>> content (which is the same of course) and overwriting it on the way
>>>>> out.
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>>>>>> I am writing a custom processor that, upon processing a flowfile,
>>>>>> results  in two new flowfiles (neither keeping the exact, original
>>>>>> content) out two different relationships. I might like to route the
>>>>>> original flowfile to a separate relationship.
>>>>>>
>>>>>> FlowFile original = session.get();
>>>>>>
>>>>>> Do I need to call session.create()for the two new files?
>>>>>>
>>>>>>   1. session.read()of original file's contents, not all of the way
>>>>>>      through, but send the processed output from what I do read as
>>>>>>      flowfile 1.
>>>>>>   2. session.read()of original file's contents and send resulting output
>>>>>>      as flowfile 2.
>>>>>>   3. session.transfer()of original flowfile.
>>>>>>
>>>>>> I look at all of these session.read()and session.write()calls and I'm a
>>>>>> bit confused as to which to use that won't lose the original flowfile's
>>>>>> content after #1 so I can start over again in #2.
>>>>>>
>>>>>> Thanks.
>


Re: From one flowfile to two...

Posted by Otto Fowler <ot...@gmail.com>.
 Thanks for writing that up!

On August 27, 2020 at 17:49:07, Russell Bateman (russ@windofkeltia.com)
wrote:

In case anyone cares,
https://www.javahotchocolate.com/notes/nifi-custom.html#two-split-from-one

On 8/27/20 11:15 AM, Andy LoPresto wrote:
> Russell,
>
> Glad you found a working solution. Maybe it would be better for you to
write up your findings and share them with a broader audience. I have often
seen the best explanations are written by people who were recently in the
“how do I do X?” state, as they are closest to the problem and can walk
through their process of gathering understanding. Someone who works on
these methods day in and day out may not write for the appropriate audience
or explain the experience as well.
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> He/Him
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>
>> On Aug 27, 2020, at 10:10 AM, Russell Bateman <ru...@windofkeltia.com>
wrote:
>>
>> I needed to get back here...
>>
>> I took this advice to heart and finished my processor. Thanks to Matt
and Mark for all their suggestions! They cleared up a few things. There was
one bug in the code that was mine, small, but significant in its effect on
the rest. That mistake also explained why I thought the uuidwas identical
between at least two of the cloned flowfiles. What I would wish for, and am
probably not strong enough to write, would be a synthesis of the session
methods read() and write() and how best to use them (one-to-one,
one-to-many, etc.). Javadoc is too paratactic by nature, the NiFi
Developer's Guide almost silent on these methods. If it were not for the
many existing examples using these methods, it would be hard to learn to do
even simple things. I did look for something closer to what I needed to do,
but unsuccessfully.
>>
>> Thanks again. If anything, the NiFi mailing lists are a place both for
great information and being treated well.
>>
>> Russ
>>
>> On 8/25/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>>
>>> Several comments here. I’ve included them inline, below.
>>>
>>> Hope it’s helpful.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com>
wrote:
>>>>
>>>> Thanks for your suggestions, Matt.
>>>>
>>>> I decided to keep the original flowfile only upon failure. So, I have
the embedded-document file and the serialized POJOs created from processing
the non embedded-document part as the result if successful. (Condensed code
at end...)
>>>>
>>>> Now I have three questions...
>>>>
>>>> 1. I seem not to have placated NiFi with the assurance that I have
transferred or disposed of all three flowfiles suitably. I get:
>>>>
>>>> java.lang.AssertionError:
org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot
commit session because the following FlowFiles have not been removed or
transferred: [2]
>>>>> This is probably because at the end of the block, you catch Exception
and then route the original FlowFile to failure. But you’ve already cloned
it and didn’t deal with the clone.
>>>> *Which of the three flowfiles does [2] refer to? Or does it just mean
I botched two flowfiles. *
>>>>
>>>> 2. session.clone()generates a new flowfile with the identical uuid. I
don't think I want the result to be two flowfiles with the same uuid. I am
binding them together so I can associate them later using attribute
embedded-document. *Should I/How do I force cloning to acquire new
**uuid**s?*
>>>>>> This appears to actually be a bug in the mock framework. It *should*
have a unique uuid, and would in a running NiFi instance. Feel free to file
a Jira for that.
>>>> 3. A question on theory... *Wouldn't all of this cloning be expensive*
and I should just clone for one of the new files and then mangle the
original flowfile to become the other?
>>>>> session.clone() is not particularly expensive. It’s just creating a
new FlowFile object. It doesn’t clone the FlowFile’s contents.
>>> That said, it is probably more appropriate to call
session.create(flowFile), rather than session.clone(flowFile). It makes
little difference in practice but what you’re really doing is forking a
child, and that will come across more cleanly in the Provenance lineage
that is generated if using session.create(flowFile).
>>>
>>> Additional comments in code below.
>>>
>>>
>>>> Thanks,
>>>> Russ
>>>>
>>>>
>>>> @Override
>>>> public void onTrigger( final ProcessContext context, final
ProcessSession session ) throws ProcessException
>>>> {
>>>> FlowFile flowfile = session.get();
>>>>
>>>> if( flowfile == null )
>>>> {
>>>> context.yield();
>>>>>> No need to yield here. Let the framework handle the scheduling.
ProcessContext.yield() is meant for cases where you’re communicating with
some external service, for instance, and you know the service is
unavailable or rate limiting you or something like that. You can’t make any
progress, so tell NiFi to not bother wasting CPU cycles with this
Processor.
>>>> return;
>>>> }
>>>>
>>>> try
>>>> {
>>>> final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
>>>>
>>>> FlowFile document = session.clone( flowfile );
>>>>
>>>> * // excerpt and write the embedded document to a new flowfile...*
>>>> session.write( document, new OutputStreamCallback()
>>>> {
>>>> @Override public void process( OutputStream outputStream )
>>>> {
>>>> // read from the original flowfile copying to the output flowfile...
>>>> session.read( flowfile, new InputStreamCallback()
>>>> {
>>>> @Override public void process( InputStream inputStream ) throws
IOException
>>>> {
>>>> ...
>>>> }
>>>> } );
>>>> }
>>>> } );
>>>>
>>>> FlowFile concepts = session.clone( flowfile );
>>>>
>>>> AtomicReference< ConceptList > conceptListHolder = new
AtomicReference<>();
>>>>
>>>> * // parse the concepts into a POJO list...*
>>>> session.read( concepts, new InputStreamCallback()
>>>> {
>>>> final ConceptList conceptList = conceptListHolder.get();
>>>>
>>>> @Override public void process( InputStream inputStream ) throws
IOException
>>>> {
>>>> ...
>>>> }
>>>> } );
>>>>
>>>> * // write out the concept POJOs serialized...*
>>>> session.write( concepts, new OutputStreamCallback()
>>>> {
>>>> @Override public void process( OutputStream outputStream )
>>>> {
>>>> ...
>>>> }
>>>> } );
>>>>> At this point, you’ve written to the ‘document’ flowfile once,
written to the ‘concepts’ flowfile once and read the original FlowFile
twice (well read the original flowfile once and read the clone once, which
amounts to the same thing).
>>> You could instead do something like:
>>>
>>> FlowFile document = session.create(flowFile);
>>> FlowFile concepts = session.create(flowFile);
>>>
>>> try (final InputStream input = session.read(flowFile)) {
>>> try (final OutputStream documentOut = session.write(document);
>>> final OutputStream conceptOut = session.write(concept)) {
>>>
>>> // Perform processing.
>>>
>>> }
>>> }
>>>
>>> In this way, you avoid reading the input FlowFile twice. Of course, you
provided an abstraction of the code, so it’s possible that this won’t
actually work, depending on what you’re doing to read the input...
>>>
>>>> document = session.putAttribute( document, "embedded-document", UUID
);
>>>> concepts = session.putAttribute( document, "embedded-document", UUID
);
>>>> session.transfer( document, DOCUMENT );
>>>> session.transfer( concepts, CONCEPTS );
>>>> session.remove( flowfile );
>>>> }
>>>> catch( Exception e )
>>>> {
>>>> session.transfer( flowfile, FAILURE );
>>>> }
>>>> }
>>>>
>>>> On 8/24/20 4:52 PM, Matt Burgess wrote:
>>>>> Russell,
>>>>>
>>>>> session.read() won't overwrite any contents of the incoming flow
file,
>>>>> but write() will. For #2, are you doing any processing on the file?
If
>>>>> not, wouldn't that be the original flowfile anyway? Or do you want it
>>>>> to be a different flowfile on purpose (so you can send the incoming
>>>>> flowfile to a different relationship)? You can use session.clone() to
>>>>> create a new flowfile that has the same content and attributes from
>>>>> the incoming flowfile, then handle that separately from the incoming
>>>>> (original) flowfile. For #1, you could clone() the original flowfile
>>>>> and do the read/process/write as part of a session.write(FlowFile,
>>>>> StreamCallback) call, then you're technically reading the "new" file
>>>>> content (which is the same of course) and overwriting it on the way
>>>>> out.
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com>
wrote:
>>>>>> I am writing a custom processor that, upon processing a flowfile,
>>>>>> results in two new flowfiles (neither keeping the exact, original
>>>>>> content) out two different relationships. I might like to route the
>>>>>> original flowfile to a separate relationship.
>>>>>>
>>>>>> FlowFile original = session.get();
>>>>>>
>>>>>> Do I need to call session.create()for the two new files?
>>>>>>
>>>>>> 1. session.read()of original file's contents, not all of the way
>>>>>> through, but send the processed output from what I do read as
>>>>>> flowfile 1.
>>>>>> 2. session.read()of original file's contents and send resulting
output
>>>>>> as flowfile 2.
>>>>>> 3. session.transfer()of original flowfile.
>>>>>>
>>>>>> I look at all of these session.read()and session.write()calls and
I'm a
>>>>>> bit confused as to which to use that won't lose the original
flowfile's
>>>>>> content after #1 so I can start over again in #2.
>>>>>>
>>>>>> Thanks.
>

Re: From one flowfile to two...

Posted by Russell Bateman <ru...@windofkeltia.com>.
In case anyone cares,
https://www.javahotchocolate.com/notes/nifi-custom.html#two-split-from-one

On 8/27/20 11:15 AM, Andy LoPresto wrote:
> Russell,
>
> Glad you found a working solution. Maybe it would be better for you to write up your findings and share them with a broader audience. I have often seen the best explanations are written by people who were recently in the “how do I do X?” state, as they are closest to the problem and can walk through their process of gathering understanding. Someone who works on these methods day in and day out may not write for the appropriate audience or explain the experience as well.
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> He/Him
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
>> On Aug 27, 2020, at 10:10 AM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>
>> I needed to get back here...
>>
>> I took this advice to heart and finished my processor. Thanks to Matt and Mark for all their suggestions! They cleared up a few things. There was one bug in the code that was mine, small, but significant in its effect on the rest. That mistake also explained why I thought the uuidwas identical between at least two of the cloned flowfiles. What I would wish for, and am probably not strong enough to write, would be a synthesis of the session methods read() and write() and how best to use them (one-to-one, one-to-many, etc.). Javadoc is too paratactic by nature, the NiFi Developer's Guide almost silent on these methods. If it were not for the many existing examples using these methods, it would be hard to learn to do even simple things. I did look for something closer to what I needed to do, but unsuccessfully.
>>
>> Thanks again. If anything, the NiFi mailing lists are a place both for great information and being treated well.
>>
>> Russ
>>
>> On 8/25/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>>
>>> Several comments here. I’ve included them inline, below.
>>>
>>> Hope it’s helpful.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>>>
>>>> Thanks for your suggestions, Matt.
>>>>
>>>> I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)
>>>>
>>>> Now I have three questions...
>>>>
>>>> 1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:
>>>>
>>>> java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]
>>>>> This is probably because at the end of the block, you catch Exception and then route the original FlowFile to failure. But you’ve already cloned it and didn’t deal with the clone.
>>>> *Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *
>>>>
>>>> 2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*
>>>>>> This appears to actually be a bug in the mock framework. It *should* have a unique uuid, and would in a running NiFi instance. Feel free to file a Jira for that.
>>>> 3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?
>>>>> session.clone() is not particularly expensive. It’s just creating a new FlowFile object. It doesn’t clone the FlowFile’s contents.
>>> That said, it is probably more appropriate to call session.create(flowFile), rather than session.clone(flowFile). It makes little difference in practice but what you’re really doing is forking a child, and that will come across more cleanly in the Provenance lineage that is generated if using session.create(flowFile).
>>>
>>> Additional comments in code below.
>>>
>>>
>>>> Thanks,
>>>> Russ
>>>>
>>>>
>>>> @Override
>>>> public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
>>>> {
>>>>    FlowFile flowfile = session.get();
>>>>
>>>>    if( flowfile == null )
>>>>    {
>>>>      context.yield();
>>>>>> No need to yield here. Let the framework handle the scheduling. ProcessContext.yield() is meant for cases where you’re communicating with some external service, for instance, and you know the service is unavailable or rate limiting you or something like that. You can’t make any progress, so tell NiFi to not bother wasting CPU cycles with this Processor.
>>>>      return;
>>>>    }
>>>>
>>>>    try
>>>>    {
>>>>      final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
>>>>
>>>>      FlowFile document = session.clone( flowfile );
>>>>
>>>> *    // excerpt and write the embedded document to a new flowfile...*
>>>>      session.write( document, new OutputStreamCallback()
>>>>      {
>>>>        @Override public void process( OutputStream outputStream )
>>>>        {
>>>>          // read from the original flowfile copying to the output flowfile...
>>>>          session.read( flowfile, new InputStreamCallback()
>>>>          {
>>>>            @Override public void process( InputStream inputStream ) throws IOException
>>>>            {
>>>>             ...
>>>>            }
>>>>          } );
>>>>        }
>>>>      } );
>>>>
>>>>      FlowFile concepts = session.clone( flowfile );
>>>>
>>>>      AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
>>>>
>>>> *    // parse the concepts into a POJO list...*
>>>>      session.read( concepts, new InputStreamCallback()
>>>>      {
>>>>        final ConceptList conceptList = conceptListHolder.get();
>>>>
>>>>        @Override public void process( InputStream inputStream ) throws IOException
>>>>        {
>>>>          ...
>>>>        }
>>>>      } );
>>>>
>>>> *    // write out the concept POJOs serialized...*
>>>>      session.write( concepts, new OutputStreamCallback()
>>>>      {
>>>>        @Override public void process( OutputStream outputStream )
>>>>        {
>>>>          ...
>>>>        }
>>>>      } );
>>>>> At this point, you’ve written to the ‘document’ flowfile once, written to the ‘concepts’ flowfile once and read the original FlowFile twice (well read the original flowfile once and read the clone once, which amounts to the same thing).
>>> You could instead do something like:
>>>
>>> FlowFile document = session.create(flowFile);
>>> FlowFile concepts = session.create(flowFile);
>>>
>>> try (final InputStream input = session.read(flowFile)) {
>>>      try (final OutputStream documentOut = session.write(document);
>>>            final OutputStream conceptOut = session.write(concept)) {
>>>
>>>           // Perform processing.
>>>
>>>      }
>>> }
>>>
>>> In this way, you avoid reading the input FlowFile twice. Of course, you provided an abstraction of the code, so it’s possible that this won’t actually work, depending on what you’re doing to read the input...
>>>
>>>>      document = session.putAttribute( document, "embedded-document", UUID );
>>>>      concepts = session.putAttribute( document, "embedded-document", UUID );
>>>>      session.transfer( document, DOCUMENT );
>>>>      session.transfer( concepts, CONCEPTS );
>>>>      session.remove( flowfile );
>>>>    }
>>>>    catch( Exception e )
>>>>    {
>>>>      session.transfer( flowfile, FAILURE );
>>>>    }
>>>> }
>>>>
>>>> On 8/24/20 4:52 PM, Matt Burgess wrote:
>>>>> Russell,
>>>>>
>>>>> session.read() won't overwrite any contents of the incoming flow file,
>>>>> but write() will. For #2, are you doing any processing on the file? If
>>>>> not, wouldn't that be the original flowfile anyway? Or do you want it
>>>>> to be a different flowfile on purpose (so you can send the incoming
>>>>> flowfile to a different relationship)? You can use session.clone() to
>>>>> create a new flowfile that has the same content and attributes from
>>>>> the incoming flowfile, then handle that separately from the incoming
>>>>> (original) flowfile. For #1, you could clone() the original flowfile
>>>>> and do the read/process/write as part of a session.write(FlowFile,
>>>>> StreamCallback) call, then you're technically reading the "new" file
>>>>> content (which is the same of course) and overwriting it on the way
>>>>> out.
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>>>>>> I am writing a custom processor that, upon processing a flowfile,
>>>>>> results  in two new flowfiles (neither keeping the exact, original
>>>>>> content) out two different relationships. I might like to route the
>>>>>> original flowfile to a separate relationship.
>>>>>>
>>>>>> FlowFile original = session.get();
>>>>>>
>>>>>> Do I need to call session.create()for the two new files?
>>>>>>
>>>>>>   1. session.read()of original file's contents, not all of the way
>>>>>>      through, but send the processed output from what I do read as
>>>>>>      flowfile 1.
>>>>>>   2. session.read()of original file's contents and send resulting output
>>>>>>      as flowfile 2.
>>>>>>   3. session.transfer()of original flowfile.
>>>>>>
>>>>>> I look at all of these session.read()and session.write()calls and I'm a
>>>>>> bit confused as to which to use that won't lose the original flowfile's
>>>>>> content after #1 so I can start over again in #2.
>>>>>>
>>>>>> Thanks.
>


Re: From one flowfile to two...

Posted by Andy LoPresto <al...@apache.org>.
Russell,

Glad you found a working solution. Maybe it would be better for you to write up your findings and share them with a broader audience. I have often seen the best explanations are written by people who were recently in the “how do I do X?” state, as they are closest to the problem and can walk through their process of gathering understanding. Someone who works on these methods day in and day out may not write for the appropriate audience or explain the experience as well. 

Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Aug 27, 2020, at 10:10 AM, Russell Bateman <ru...@windofkeltia.com> wrote:
> 
> I needed to get back here...
> 
> I took this advice to heart and finished my processor. Thanks to Matt and Mark for all their suggestions! They cleared up a few things. There was one bug in the code that was mine, small, but significant in its effect on the rest. That mistake also explained why I thought the uuidwas identical between at least two of the cloned flowfiles. What I would wish for, and am probably not strong enough to write, would be a synthesis of the session methods read() and write() and how best to use them (one-to-one, one-to-many, etc.). Javadoc is too paratactic by nature, the NiFi Developer's Guide almost silent on these methods. If it were not for the many existing examples using these methods, it would be hard to learn to do even simple things. I did look for something closer to what I needed to do, but unsuccessfully.
> 
> Thanks again. If anything, the NiFi mailing lists are a place both for great information and being treated well.
> 
> Russ
> 
> On 8/25/20 12:24 PM, Mark Payne wrote:
>> Russ,
>> 
>> Several comments here. I’ve included them inline, below.
>> 
>> Hope it’s helpful.
>> 
>> Thanks
>> -Mark
>> 
>> 
>>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>> 
>>> Thanks for your suggestions, Matt.
>>> 
>>> I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)
>>> 
>>> Now I have three questions...
>>> 
>>> 1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:
>>> 
>>> java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]
>>>> This is probably because at the end of the block, you catch Exception and then route the original FlowFile to failure. But you’ve already cloned it and didn’t deal with the clone.
>>> *Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *
>>> 
>>> 2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*
>>>>> This appears to actually be a bug in the mock framework. It *should* have a unique uuid, and would in a running NiFi instance. Feel free to file a Jira for that.
>>> 3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?
>>>> session.clone() is not particularly expensive. It’s just creating a new FlowFile object. It doesn’t clone the FlowFile’s contents.
>> That said, it is probably more appropriate to call session.create(flowFile), rather than session.clone(flowFile). It makes little difference in practice but what you’re really doing is forking a child, and that will come across more cleanly in the Provenance lineage that is generated if using session.create(flowFile).
>> 
>> Additional comments in code below.
>> 
>> 
>>> Thanks,
>>> Russ
>>> 
>>> 
>>> @Override
>>> public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
>>> {
>>>   FlowFile flowfile = session.get();
>>> 
>>>   if( flowfile == null )
>>>   {
>>>     context.yield();
>>>>> No need to yield here. Let the framework handle the scheduling. ProcessContext.yield() is meant for cases where you’re communicating with some external service, for instance, and you know the service is unavailable or rate limiting you or something like that. You can’t make any progress, so tell NiFi to not bother wasting CPU cycles with this Processor.
>>>     return;
>>>   }
>>> 
>>>   try
>>>   {
>>>     final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
>>> 
>>>     FlowFile document = session.clone( flowfile );
>>> 
>>> *    // excerpt and write the embedded document to a new flowfile...*
>>>     session.write( document, new OutputStreamCallback()
>>>     {
>>>       @Override public void process( OutputStream outputStream )
>>>       {
>>>         // read from the original flowfile copying to the output flowfile...
>>>         session.read( flowfile, new InputStreamCallback()
>>>         {
>>>           @Override public void process( InputStream inputStream ) throws IOException
>>>           {
>>>            ...
>>>           }
>>>         } );
>>>       }
>>>     } );
>>> 
>>>     FlowFile concepts = session.clone( flowfile );
>>> 
>>>     AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
>>> 
>>> *    // parse the concepts into a POJO list...*
>>>     session.read( concepts, new InputStreamCallback()
>>>     {
>>>       final ConceptList conceptList = conceptListHolder.get();
>>> 
>>>       @Override public void process( InputStream inputStream ) throws IOException
>>>       {
>>>         ...
>>>       }
>>>     } );
>>> 
>>> *    // write out the concept POJOs serialized...*
>>>     session.write( concepts, new OutputStreamCallback()
>>>     {
>>>       @Override public void process( OutputStream outputStream )
>>>       {
>>>         ...
>>>       }
>>>     } );
>>>> At this point, you’ve written to the ‘document’ flowfile once, written to the ‘concepts’ flowfile once and read the original FlowFile twice (well read the original flowfile once and read the clone once, which amounts to the same thing).
>> You could instead do something like:
>> 
>> FlowFile document = session.create(flowFile);
>> FlowFile concepts = session.create(flowFile);
>> 
>> try (final InputStream input = session.read(flowFile)) {
>>     try (final OutputStream documentOut = session.write(document);
>>           final OutputStream conceptOut = session.write(concept)) {
>> 
>>          // Perform processing.
>> 
>>     }
>> }
>> 
>> In this way, you avoid reading the input FlowFile twice. Of course, you provided an abstraction of the code, so it’s possible that this won’t actually work, depending on what you’re doing to read the input...
>> 
>>>     document = session.putAttribute( document, "embedded-document", UUID );
>>>     concepts = session.putAttribute( document, "embedded-document", UUID );
>>>     session.transfer( document, DOCUMENT );
>>>     session.transfer( concepts, CONCEPTS );
>>>     session.remove( flowfile );
>>>   }
>>>   catch( Exception e )
>>>   {
>>>     session.transfer( flowfile, FAILURE );
>>>   }
>>> }
>>> 
>>> On 8/24/20 4:52 PM, Matt Burgess wrote:
>>>> Russell,
>>>> 
>>>> session.read() won't overwrite any contents of the incoming flow file,
>>>> but write() will. For #2, are you doing any processing on the file? If
>>>> not, wouldn't that be the original flowfile anyway? Or do you want it
>>>> to be a different flowfile on purpose (so you can send the incoming
>>>> flowfile to a different relationship)? You can use session.clone() to
>>>> create a new flowfile that has the same content and attributes from
>>>> the incoming flowfile, then handle that separately from the incoming
>>>> (original) flowfile. For #1, you could clone() the original flowfile
>>>> and do the read/process/write as part of a session.write(FlowFile,
>>>> StreamCallback) call, then you're technically reading the "new" file
>>>> content (which is the same of course) and overwriting it on the way
>>>> out.
>>>> 
>>>> Regards,
>>>> Matt
>>>> 
>>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>>>>> I am writing a custom processor that, upon processing a flowfile,
>>>>> results  in two new flowfiles (neither keeping the exact, original
>>>>> content) out two different relationships. I might like to route the
>>>>> original flowfile to a separate relationship.
>>>>> 
>>>>> FlowFile original = session.get();
>>>>> 
>>>>> Do I need to call session.create()for the two new files?
>>>>> 
>>>>>  1. session.read()of original file's contents, not all of the way
>>>>>     through, but send the processed output from what I do read as
>>>>>     flowfile 1.
>>>>>  2. session.read()of original file's contents and send resulting output
>>>>>     as flowfile 2.
>>>>>  3. session.transfer()of original flowfile.
>>>>> 
>>>>> I look at all of these session.read()and session.write()calls and I'm a
>>>>> bit confused as to which to use that won't lose the original flowfile's
>>>>> content after #1 so I can start over again in #2.
>>>>> 
>>>>> Thanks.
> 


Re: From one flowfile to two...

Posted by Russell Bateman <ru...@windofkeltia.com>.
I needed to get back here...

I took this advice to heart and finished my processor. Thanks to Matt 
and Mark for all their suggestions! They cleared up a few things. There 
was one bug in the code that was mine, small, but significant in its 
effect on the rest. That mistake also explained why I thought the 
uuidwas identical between at least two of the cloned flowfiles. What I 
would wish for, and am probably not strong enough to write, would be a 
synthesis of the session methods read() and write() and how best to use 
them (one-to-one, one-to-many, etc.). Javadoc is too paratactic by 
nature, the NiFi Developer's Guide almost silent on these methods. If it 
were not for the many existing examples using these methods, it would be 
hard to learn to do even simple things. I did look for something closer 
to what I needed to do, but unsuccessfully.

Thanks again. If anything, the NiFi mailing lists are a place both for 
great information and being treated well.

Russ

On 8/25/20 12:24 PM, Mark Payne wrote:
> Russ,
>
> Several comments here. I’ve included them inline, below.
>
> Hope it’s helpful.
>
> Thanks
> -Mark
>
>
>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
>>
>> Thanks for your suggestions, Matt.
>>
>> I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)
>>
>> Now I have three questions...
>>
>> 1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:
>>
>> java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]
>>> This is probably because at the end of the block, you catch Exception and then route the original FlowFile to failure. But you’ve already cloned it and didn’t deal with the clone.
>> *Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *
>>
>> 2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*
>>>> This appears to actually be a bug in the mock framework. It *should* have a unique uuid, and would in a running NiFi instance. Feel free to file a Jira for that.
>> 3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?
>>> session.clone() is not particularly expensive. It’s just creating a new FlowFile object. It doesn’t clone the FlowFile’s contents.
> That said, it is probably more appropriate to call session.create(flowFile), rather than session.clone(flowFile). It makes little difference in practice but what you’re really doing is forking a child, and that will come across more cleanly in the Provenance lineage that is generated if using session.create(flowFile).
>
> Additional comments in code below.
>
>
>> Thanks,
>> Russ
>>
>>
>> @Override
>> public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
>> {
>>    FlowFile flowfile = session.get();
>>
>>    if( flowfile == null )
>>    {
>>      context.yield();
>>>> No need to yield here. Let the framework handle the scheduling. ProcessContext.yield() is meant for cases where you’re communicating with some external service, for instance, and you know the service is unavailable or rate limiting you or something like that. You can’t make any progress, so tell NiFi to not bother wasting CPU cycles with this Processor.
>>      return;
>>    }
>>
>>    try
>>    {
>>      final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
>>
>>      FlowFile document = session.clone( flowfile );
>>
>> *    // excerpt and write the embedded document to a new flowfile...*
>>      session.write( document, new OutputStreamCallback()
>>      {
>>        @Override public void process( OutputStream outputStream )
>>        {
>>          // read from the original flowfile copying to the output flowfile...
>>          session.read( flowfile, new InputStreamCallback()
>>          {
>>            @Override public void process( InputStream inputStream ) throws IOException
>>            {
>>             ...
>>            }
>>          } );
>>        }
>>      } );
>>
>>      FlowFile concepts = session.clone( flowfile );
>>
>>      AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
>>
>> *    // parse the concepts into a POJO list...*
>>      session.read( concepts, new InputStreamCallback()
>>      {
>>        final ConceptList conceptList = conceptListHolder.get();
>>
>>        @Override public void process( InputStream inputStream ) throws IOException
>>        {
>>          ...
>>        }
>>      } );
>>
>> *    // write out the concept POJOs serialized...*
>>      session.write( concepts, new OutputStreamCallback()
>>      {
>>        @Override public void process( OutputStream outputStream )
>>        {
>>          ...
>>        }
>>      } );
>>> At this point, you’ve written to the ‘document’ flowfile once, written to the ‘concepts’ flowfile once and read the original FlowFile twice (well read the original flowfile once and read the clone once, which amounts to the same thing).
> You could instead do something like:
>
> FlowFile document = session.create(flowFile);
> FlowFile concepts = session.create(flowFile);
>
> try (final InputStream input = session.read(flowFile)) {
>      try (final OutputStream documentOut = session.write(document);
>            final OutputStream conceptOut = session.write(concept)) {
>
>           // Perform processing.
>
>      }
> }
>
> In this way, you avoid reading the input FlowFile twice. Of course, you provided an abstraction of the code, so it’s possible that this won’t actually work, depending on what you’re doing to read the input...
>
>>      document = session.putAttribute( document, "embedded-document", UUID );
>>      concepts = session.putAttribute( document, "embedded-document", UUID );
>>      session.transfer( document, DOCUMENT );
>>      session.transfer( concepts, CONCEPTS );
>>      session.remove( flowfile );
>>    }
>>    catch( Exception e )
>>    {
>>      session.transfer( flowfile, FAILURE );
>>    }
>> }
>>
>> On 8/24/20 4:52 PM, Matt Burgess wrote:
>>> Russell,
>>>
>>> session.read() won't overwrite any contents of the incoming flow file,
>>> but write() will. For #2, are you doing any processing on the file? If
>>> not, wouldn't that be the original flowfile anyway? Or do you want it
>>> to be a different flowfile on purpose (so you can send the incoming
>>> flowfile to a different relationship)? You can use session.clone() to
>>> create a new flowfile that has the same content and attributes from
>>> the incoming flowfile, then handle that separately from the incoming
>>> (original) flowfile. For #1, you could clone() the original flowfile
>>> and do the read/process/write as part of a session.write(FlowFile,
>>> StreamCallback) call, then you're technically reading the "new" file
>>> content (which is the same of course) and overwriting it on the way
>>> out.
>>>
>>> Regards,
>>> Matt
>>>
>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>>>> I am writing a custom processor that, upon processing a flowfile,
>>>> results  in two new flowfiles (neither keeping the exact, original
>>>> content) out two different relationships. I might like to route the
>>>> original flowfile to a separate relationship.
>>>>
>>>> FlowFile original = session.get();
>>>>
>>>> Do I need to call session.create()for the two new files?
>>>>
>>>>   1. session.read()of original file's contents, not all of the way
>>>>      through, but send the processed output from what I do read as
>>>>      flowfile 1.
>>>>   2. session.read()of original file's contents and send resulting output
>>>>      as flowfile 2.
>>>>   3. session.transfer()of original flowfile.
>>>>
>>>> I look at all of these session.read()and session.write()calls and I'm a
>>>> bit confused as to which to use that won't lose the original flowfile's
>>>> content after #1 so I can start over again in #2.
>>>>
>>>> Thanks.


Re: From one flowfile to two...

Posted by Mark Payne <ma...@hotmail.com>.
Russ,

Several comments here. I’ve included them inline, below.

Hope it’s helpful.

Thanks
-Mark


> On Aug 25, 2020, at 2:09 PM, Russell Bateman <ru...@windofkeltia.com> wrote:
> 
> Thanks for your suggestions, Matt.
> 
> I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)
> 
> Now I have three questions...
> 
> 1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:
> 
> java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]

>> This is probably because at the end of the block, you catch Exception and then route the original FlowFile to failure. But you’ve already cloned it and didn’t deal with the clone.

> 
> *Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *
> 
> 2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*

>>> This appears to actually be a bug in the mock framework. It *should* have a unique uuid, and would in a running NiFi instance. Feel free to file a Jira for that.

> 
> 3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?

>> session.clone() is not particularly expensive. It’s just creating a new FlowFile object. It doesn’t clone the FlowFile’s contents.

That said, it is probably more appropriate to call session.create(flowFile), rather than session.clone(flowFile). It makes little difference in practice but what you’re really doing is forking a child, and that will come across more cleanly in the Provenance lineage that is generated if using session.create(flowFile).

Additional comments in code below.


> 
> Thanks,
> Russ
> 
> 
> @Override
> public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
> {
>   FlowFile flowfile = session.get();
> 
>   if( flowfile == null )
>   {
>     context.yield();
>>> No need to yield here. Let the framework handle the scheduling. ProcessContext.yield() is meant for cases where you’re communicating with some external service, for instance, and you know the service is unavailable or rate limiting you or something like that. You can’t make any progress, so tell NiFi to not bother wasting CPU cycles with this Processor.

>     return;
>   }
> 
>   try
>   {
>     final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
> 
>     FlowFile document = session.clone( flowfile );
> 
> *    // excerpt and write the embedded document to a new flowfile...*
>     session.write( document, new OutputStreamCallback()
>     {
>       @Override public void process( OutputStream outputStream )
>       {
>         // read from the original flowfile copying to the output flowfile...
>         session.read( flowfile, new InputStreamCallback()
>         {
>           @Override public void process( InputStream inputStream ) throws IOException
>           {
>            ...
>           }
>         } );
>       }
>     } );
> 
>     FlowFile concepts = session.clone( flowfile );
> 
>     AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
> 
> *    // parse the concepts into a POJO list...*
>     session.read( concepts, new InputStreamCallback()
>     {
>       final ConceptList conceptList = conceptListHolder.get();
> 
>       @Override public void process( InputStream inputStream ) throws IOException
>       {
>         ...
>       }
>     } );
> 
> *    // write out the concept POJOs serialized...*
>     session.write( concepts, new OutputStreamCallback()
>     {
>       @Override public void process( OutputStream outputStream )
>       {
>         ...
>       }
>     } );

>> At this point, you’ve written to the ‘document’ flowfile once, written to the ‘concepts’ flowfile once and read the original FlowFile twice (well read the original flowfile once and read the clone once, which amounts to the same thing).
You could instead do something like:

FlowFile document = session.create(flowFile);
FlowFile concepts = session.create(flowFile);

try (final InputStream input = session.read(flowFile)) {
    try (final OutputStream documentOut = session.write(document);
          final OutputStream conceptOut = session.write(concept)) {

         // Perform processing.

    }
}

In this way, you avoid reading the input FlowFile twice. Of course, you provided an abstraction of the code, so it’s possible that this won’t actually work, depending on what you’re doing to read the input...

> 
>     document = session.putAttribute( document, "embedded-document", UUID );
>     concepts = session.putAttribute( document, "embedded-document", UUID );
>     session.transfer( document, DOCUMENT );
>     session.transfer( concepts, CONCEPTS );
>     session.remove( flowfile );
>   }
>   catch( Exception e )
>   {
>     session.transfer( flowfile, FAILURE );
>   }
> }
> 
> On 8/24/20 4:52 PM, Matt Burgess wrote:
>> Russell,
>> 
>> session.read() won't overwrite any contents of the incoming flow file,
>> but write() will. For #2, are you doing any processing on the file? If
>> not, wouldn't that be the original flowfile anyway? Or do you want it
>> to be a different flowfile on purpose (so you can send the incoming
>> flowfile to a different relationship)? You can use session.clone() to
>> create a new flowfile that has the same content and attributes from
>> the incoming flowfile, then handle that separately from the incoming
>> (original) flowfile. For #1, you could clone() the original flowfile
>> and do the read/process/write as part of a session.write(FlowFile,
>> StreamCallback) call, then you're technically reading the "new" file
>> content (which is the same of course) and overwriting it on the way
>> out.
>> 
>> Regards,
>> Matt
>> 
>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>>> I am writing a custom processor that, upon processing a flowfile,
>>> results  in two new flowfiles (neither keeping the exact, original
>>> content) out two different relationships. I might like to route the
>>> original flowfile to a separate relationship.
>>> 
>>> FlowFile original = session.get();
>>> 
>>> Do I need to call session.create()for the two new files?
>>> 
>>>  1. session.read()of original file's contents, not all of the way
>>>     through, but send the processed output from what I do read as
>>>     flowfile 1.
>>>  2. session.read()of original file's contents and send resulting output
>>>     as flowfile 2.
>>>  3. session.transfer()of original flowfile.
>>> 
>>> I look at all of these session.read()and session.write()calls and I'm a
>>> bit confused as to which to use that won't lose the original flowfile's
>>> content after #1 so I can start over again in #2.
>>> 
>>> Thanks.
> 


Re: From one flowfile to two...

Posted by Russell Bateman <ru...@windofkeltia.com>.
Thanks for your suggestions, Matt.

I decided to keep the original flowfile only upon failure. So, I have 
the embedded-document file and the serialized POJOs created from 
processing the non embedded-document part as the result if successful. 
(Condensed code at end...)

Now I have three questions...

1. I seem not to have placated NiFi with the assurance that I have 
transferred or disposed of all three flowfiles suitably. I get:

java.lang.AssertionError: 
org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot 
commit session because the following FlowFiles have not been removed or 
transferred: [2]

*Which of the three flowfiles does [2] refer to? Or does it just mean I 
botched two flowfiles. *

2. session.clone()generates a new flowfile with the identical uuid. I 
don't think I want the result to be two flowfiles with the same uuid. I 
am binding them together so I can associate them later using attribute 
embedded-document. *Should I/How do I force cloning to acquire new 
**uuid**s?*

3. A question on theory... *Wouldn't all of this cloning be expensive* 
and I should just clone for one of the new files and then mangle the 
original flowfile to become the other?

Thanks,
Russ


@Override
public void onTrigger( final ProcessContext context, final 
ProcessSession session ) throws ProcessException
{
   FlowFile flowfile = session.get();

   if( flowfile == null )
   {
     context.yield();
     return;
   }

   try
   {
     final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );

     FlowFile document = session.clone( flowfile );

*    // excerpt and write the embedded document to a new flowfile...*
     session.write( document, new OutputStreamCallback()
     {
       @Override public void process( OutputStream outputStream )
       {
         // read from the original flowfile copying to the output 
flowfile...
         session.read( flowfile, new InputStreamCallback()
         {
           @Override public void process( InputStream inputStream ) 
throws IOException
           {
            ...
           }
         } );
       }
     } );

     FlowFile concepts = session.clone( flowfile );

     AtomicReference< ConceptList > conceptListHolder = new 
AtomicReference<>();

*    // parse the concepts into a POJO list...*
     session.read( concepts, new InputStreamCallback()
     {
       final ConceptList conceptList = conceptListHolder.get();

       @Override public void process( InputStream inputStream ) throws 
IOException
       {
         ...
       }
     } );

*    // write out the concept POJOs serialized...*
     session.write( concepts, new OutputStreamCallback()
     {
       @Override public void process( OutputStream outputStream )
       {
         ...
       }
     } );

     document = session.putAttribute( document, "embedded-document", UUID );
     concepts = session.putAttribute( document, "embedded-document", UUID );
     session.transfer( document, DOCUMENT );
     session.transfer( concepts, CONCEPTS );
     session.remove( flowfile );
   }
   catch( Exception e )
   {
     session.transfer( flowfile, FAILURE );
   }
}

On 8/24/20 4:52 PM, Matt Burgess wrote:
> Russell,
>
> session.read() won't overwrite any contents of the incoming flow file,
> but write() will. For #2, are you doing any processing on the file? If
> not, wouldn't that be the original flowfile anyway? Or do you want it
> to be a different flowfile on purpose (so you can send the incoming
> flowfile to a different relationship)? You can use session.clone() to
> create a new flowfile that has the same content and attributes from
> the incoming flowfile, then handle that separately from the incoming
> (original) flowfile. For #1, you could clone() the original flowfile
> and do the read/process/write as part of a session.write(FlowFile,
> StreamCallback) call, then you're technically reading the "new" file
> content (which is the same of course) and overwriting it on the way
> out.
>
> Regards,
> Matt
>
> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>> I am writing a custom processor that, upon processing a flowfile,
>> results  in two new flowfiles (neither keeping the exact, original
>> content) out two different relationships. I might like to route the
>> original flowfile to a separate relationship.
>>
>> FlowFile original = session.get();
>>
>> Do I need to call session.create()for the two new files?
>>
>>   1. session.read()of original file's contents, not all of the way
>>      through, but send the processed output from what I do read as
>>      flowfile 1.
>>   2. session.read()of original file's contents and send resulting output
>>      as flowfile 2.
>>   3. session.transfer()of original flowfile.
>>
>> I look at all of these session.read()and session.write()calls and I'm a
>> bit confused as to which to use that won't lose the original flowfile's
>> content after #1 so I can start over again in #2.
>>
>> Thanks.


Re: From one flowfile to two...

Posted by Matt Burgess <ma...@apache.org>.
Russell,

session.read() won't overwrite any contents of the incoming flow file,
but write() will. For #2, are you doing any processing on the file? If
not, wouldn't that be the original flowfile anyway? Or do you want it
to be a different flowfile on purpose (so you can send the incoming
flowfile to a different relationship)? You can use session.clone() to
create a new flowfile that has the same content and attributes from
the incoming flowfile, then handle that separately from the incoming
(original) flowfile. For #1, you could clone() the original flowfile
and do the read/process/write as part of a session.write(FlowFile,
StreamCallback) call, then you're technically reading the "new" file
content (which is the same of course) and overwriting it on the way
out.

Regards,
Matt

On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <ru...@windofkeltia.com> wrote:
>
> I am writing a custom processor that, upon processing a flowfile,
> results  in two new flowfiles (neither keeping the exact, original
> content) out two different relationships. I might like to route the
> original flowfile to a separate relationship.
>
> FlowFile original = session.get();
>
> Do I need to call session.create()for the two new files?
>
>  1. session.read()of original file's contents, not all of the way
>     through, but send the processed output from what I do read as
>     flowfile 1.
>  2. session.read()of original file's contents and send resulting output
>     as flowfile 2.
>  3. session.transfer()of original flowfile.
>
> I look at all of these session.read()and session.write()calls and I'm a
> bit confused as to which to use that won't lose the original flowfile's
> content after #1 so I can start over again in #2.
>
> Thanks.