You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <tl...@gmail.com> on 2018/08/22 12:07:06 UTC

Reading all flowfiles queued for a processor (>20000 flowfiles)

Hi NiFi devs,

My understanding is that when I create a custom processor, and get
FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
times the CustomFlowFileFilter returns
FlowFileFilterResult.ACCEPT_AND_CONTINUE or
FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
most 20000 flowfiles, where 20000 is defined by the
nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected
when running tests, which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at:
https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to
read ALL queued flowfiles in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are
other ways to pair files using Wait/Notify processors, I'm handling files
in large throughput, with possible delays between the pairs arriving, and
it's quite easy to hit the limit. I could also increase the swap threshold
setting, but I keep hitting the problem. I've also played with custom
prioritizers on connections in an attempt to maximise the chance of having
pairs occur, but because I need to move unmatched flowfiles out, and back
in, is essentially creating a busy loop. Seems like there should be a
better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued
flowfiles (swapping more than the threshold into memory, during a single
OnTrigger call) would be the easiest solution. Is there one?

Cheers,
Sam

Re: Reading all flowfiles queued for a processor (>20000 flowfiles)

Posted by Mark Payne <ma...@hotmail.com>.
Hi Sam,

There are a couple of ways to tackle this problem. My recommendation would be to look at extending the BinFiles processor.
This is an abstract class, which MergeContent extends (and I think 1 or 2 other processors?). Its job is to bin 'like flowfiles' together,
and it can take care of pulling data from queues and efficiently binning the FlowFiles together. It is important, though, to keep in mind
that FlowFiles contain attribute maps, and those can quickly exhaust your heap when you're trying to hold 10's or 100's of thousands
of FlowFiles in a single processor.

Thanks
-Mark


On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <tl...@gmail.com>> wrote:

Hi NiFi devs,

My understanding is that when I create a custom processor, and get FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many times the CustomFlowFileFilter returns FlowFileFilterResult.ACCEPT_AND_CONTINUE or FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at most 20000 flowfiles, where 20000 is defined by the nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected when running tests, which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at: https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to read ALL queued flowfiles in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are other ways to pair files using Wait/Notify processors, I'm handling files in large throughput, with possible delays between the pairs arriving, and it's quite easy to hit the limit. I could also increase the swap threshold setting, but I keep hitting the problem. I've also played with custom prioritizers on connections in an attempt to maximise the chance of having pairs occur, but because I need to move unmatched flowfiles out, and back in, is essentially creating a busy loop. Seems like there should be a better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued flowfiles (swapping more than the threshold into memory, during a single OnTrigger call) would be the easiest solution. Is there one?

Cheers,
Sam