You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Devin Fisher <de...@perfectsearchcorp.com> on 2016/03/11 23:29:16 UTC

Split Content (One-to-Many) early commit

I'm creating a processor that will read a customer csv and will create a
new flowfile for each line in the form of XML. The CSV file will be quite
large (100s of thousands of lines). I would like to commit a reasonable
amount from time to time so that they can flow down to other processors.
But looking at similar processors SplitText and SplitXml they save up all
the created flowfiles and release them all at the end.  In some trials, I'm
running out of memory doing that. But I can't commit the session early
because I'm still reading the original CSV file.  Is there a workflow where
I can read the incoming CSV flowfile but still release created flowfiles?
I'm thinking of not using AbstractProcessor and instead
use AbstractSessionFactoryProcessor and create two different sessions but
is that advisable or possible?

Devin

Re: Split Content (One-to-Many) early commit

Posted by Devin Fisher <de...@perfectsearchcorp.com>.
Thanks for the response. It gave my additional context such that I can
proceed with more confidence. I've been playing with creating additional
sessions and I believe that it will work for my use case. But I will keep
the duplicate flowfile potential issue in mind, though.

Thanks again.

Devin

On Sun, Mar 13, 2016 at 12:40 PM, Mark Payne <ma...@hotmail.com> wrote:

> Devin,
>
> We do realize that we have some work to do in order to make it so that
> a single Processor can buffer up hundreds of thousands or more FlowFiles.
> The SplitText processor is very popular and suffers from this exact same
> problem.
> We want to have a mechanism for swapping those out of the Java Heap,
> similar
> to how we do when we have millions of FlowFiles sitting in a queue. There
> is a ticket
> here [1] to address this. However, this has turned out to be very time
> consuming, and
> not quite a straight-forward as we had hoped, so it's not been finished up
> yet.
>
> In the meantime, you can use the approach that you described, using two
> different
> Process Sessions, by extending AbstractSessionFactoryProcessor instead of
> AbstractProcessor. The downside to this approach, though, is that when
> NiFi is restarted,
> you could potentially have a lot of data duplication.
>
> As an example, let's imagine that you create a ProcessSession and use it
> to create 10,000 FlowFiles
> and then commit the session and create a new one. If you have an incoming
> FlowFiles that has
> 1 million rows in it, you may create 800,000 FlowFiles and send them out
> and then NiFi gets restarted.
> In this case, you will pick up the original FlowFile and begin processing
> it again. But you've already sent
> out those 800,000 FlowFiles. Depending on your requirements, this may or
> may not be acceptable.
>
> One option that you could use is just to document that this behavior
> exists and that SplitText should be
> used ahead of you Processor in order to split the content into 10,000 line
> chunks. This would avoid the
> heap exhaustion.
>
> Another possible solution that you could use, though it's not as pretty as
> I'd like: Process up to 10,000 FlowFiles
> from an input FlowFile. Then, add an attribute to the input FlowFile
> indicating your progress (for instance,
> add an attribute named "rows.converted" and then do
> "session.transfer(flowFile);" This will transfer the FlowFile
> back into its input queue. You can then commit the session. Then, when you
> call session.get() to get an input
> FlowFile again, you can check for that attribute and skip that many rows.
> This way, you won't end up with
> data duplication. The downside here is that you would end up reading the
> first N rows each time and ignoring
> the content which can be expensive. A more optimized approach would be to
> wrap the InputStream in
> a ByteCountingInputStream and record the number of bytes consumed and use
> that as an attribute, and then
> for each subsequent iteration use StreamUtils.skip() to skip the
> appropriate number of bytes.
>
> I know there's a lot of info here. Let me know if anything doesn't make
> sense.
>
> I hope this helps!
> -Mark
>
>
> [1] https://issues.apache.org/jira/browse/NIFI-1008 <
> https://issues.apache.org/jira/browse/NIFI-1008>
>
>
> > On Mar 11, 2016, at 5:29 PM, Devin Fisher <
> devin.fisher@perfectsearchcorp.com> wrote:
> >
> > I'm creating a processor that will read a customer csv and will create a
> > new flowfile for each line in the form of XML. The CSV file will be quite
> > large (100s of thousands of lines). I would like to commit a reasonable
> > amount from time to time so that they can flow down to other processors.
> > But looking at similar processors SplitText and SplitXml they save up all
> > the created flowfiles and release them all at the end.  In some trials,
> I'm
> > running out of memory doing that. But I can't commit the session early
> > because I'm still reading the original CSV file.  Is there a workflow
> where
> > I can read the incoming CSV flowfile but still release created flowfiles?
> > I'm thinking of not using AbstractProcessor and instead
> > use AbstractSessionFactoryProcessor and create two different sessions but
> > is that advisable or possible?
> >
> > Devin
>
>

Re: Split Content (One-to-Many) early commit

Posted by Mark Payne <ma...@hotmail.com>.
Devin,

We do realize that we have some work to do in order to make it so that
a single Processor can buffer up hundreds of thousands or more FlowFiles.
The SplitText processor is very popular and suffers from this exact same problem.
We want to have a mechanism for swapping those out of the Java Heap, similar
to how we do when we have millions of FlowFiles sitting in a queue. There is a ticket
here [1] to address this. However, this has turned out to be very time consuming, and
not quite a straight-forward as we had hoped, so it's not been finished up yet.

In the meantime, you can use the approach that you described, using two different
Process Sessions, by extending AbstractSessionFactoryProcessor instead of
AbstractProcessor. The downside to this approach, though, is that when NiFi is restarted,
you could potentially have a lot of data duplication.

As an example, let's imagine that you create a ProcessSession and use it to create 10,000 FlowFiles
and then commit the session and create a new one. If you have an incoming FlowFiles that has
1 million rows in it, you may create 800,000 FlowFiles and send them out and then NiFi gets restarted.
In this case, you will pick up the original FlowFile and begin processing it again. But you've already sent
out those 800,000 FlowFiles. Depending on your requirements, this may or may not be acceptable.

One option that you could use is just to document that this behavior exists and that SplitText should be
used ahead of you Processor in order to split the content into 10,000 line chunks. This would avoid the
heap exhaustion.

Another possible solution that you could use, though it's not as pretty as I'd like: Process up to 10,000 FlowFiles
from an input FlowFile. Then, add an attribute to the input FlowFile indicating your progress (for instance,
add an attribute named "rows.converted" and then do "session.transfer(flowFile);" This will transfer the FlowFile
back into its input queue. You can then commit the session. Then, when you call session.get() to get an input
FlowFile again, you can check for that attribute and skip that many rows. This way, you won't end up with
data duplication. The downside here is that you would end up reading the first N rows each time and ignoring
the content which can be expensive. A more optimized approach would be to wrap the InputStream in
a ByteCountingInputStream and record the number of bytes consumed and use that as an attribute, and then
for each subsequent iteration use StreamUtils.skip() to skip the appropriate number of bytes.

I know there's a lot of info here. Let me know if anything doesn't make sense.

I hope this helps!
-Mark


[1] https://issues.apache.org/jira/browse/NIFI-1008 <https://issues.apache.org/jira/browse/NIFI-1008>


> On Mar 11, 2016, at 5:29 PM, Devin Fisher <de...@perfectsearchcorp.com> wrote:
> 
> I'm creating a processor that will read a customer csv and will create a
> new flowfile for each line in the form of XML. The CSV file will be quite
> large (100s of thousands of lines). I would like to commit a reasonable
> amount from time to time so that they can flow down to other processors.
> But looking at similar processors SplitText and SplitXml they save up all
> the created flowfiles and release them all at the end.  In some trials, I'm
> running out of memory doing that. But I can't commit the session early
> because I'm still reading the original CSV file.  Is there a workflow where
> I can read the incoming CSV flowfile but still release created flowfiles?
> I'm thinking of not using AbstractProcessor and instead
> use AbstractSessionFactoryProcessor and create two different sessions but
> is that advisable or possible?
> 
> Devin