You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Eric Secules <es...@gmail.com> on 2020/07/31 01:13:30 UTC

NiFi Merge Content Processor Use Case

Hello,

I have a use case for the merge content processor where I have split the
flow in two branches (original flowfile and PDF one branch may or may not
take longer than the other) and I want to rejoin those branches using the
defragment strategy based on the flowfile UUID of the flowfile before the
split to determine whether both branches have successfully completed. I
noticed that as I increased the amount of flowfiles generated into the
system, I got more merge failures because bins were forced to the failure
relationship before they were able to fully defragment. I can increase the
number of buckets, but this is just a workaround because it doesn't solve
the main problem. Is there a design pattern for accurately merging diverted
branches back together that holds up under load and doesn't require me to
guess a magic number for the number of bins?

Thanks,
Eric

Re: NiFi Merge Content Processor Use Case

Posted by Eric Secules <es...@gmail.com>.
I have created a JIRA issue for this:
https://issues.apache.org/jira/browse/NIFI-7699

On Fri, Jul 31, 2020 at 10:20 AM Eric Secules <es...@gmail.com> wrote:

> Hi Joe,
>
> Thanks for your suggestions and analysis!
>
> I was also thinking that the MergeContent processor could also have a wait
> relationship so that way we can distinguish between new flowfiles and
> flowfiles that didn't fit into a bucket before. Does NiFi have the ability
> to distinguish between flowfiles that come from different input
> relationships? Making that distinction might help with the "harder" part
> you mentioned.
>
> I would also suggest that you update the Property Description on "Maximum
> number of Bins" to say that bins can also be routed to failure if the
> maximum number of bins are in use and a flow file has arrived that does not
> fit in an existing bin. And also add a log message saying that the bin was
> forced to close because the maximum number of bins was reached. That would
> help users realize that the number of bins is important to set high enough
> or else flowfiles will be routed to failure if the system gets too busy.
>
> Eric
>
> On Fri, Jul 31, 2020 at 8:30 AM Joe Witt <jo...@gmail.com> wrote:
>
>> Eric
>>
>> I believe for now the most appropriate option available is to figure out
>> what the maximum set of outstanding in process buckets could be and set
>> that.  Hopefully there is enough memory in the system to handle
>> whatever that worse case could be in terms of flowfiles attributes being
>> held in mem.
>>
>> What should then be considered is how to really support what you want
>> here which is a reasonable cap on the number of buckets but the processor
>> to behave differently such that it will only kick out buckets when they
>> expire.  MergeConent supports some pretty powerful/complex cases and I
>> think this is one that needs to be included potentially.  The 'harder' part
>> historically had been that we could keep reading off the incoming flowfile
>> queue to see if any bucket in the list could take a flowfile but we'd find
>> nothing fits and put it right back in the queue and likely right back on
>> top so we'd end up doing nothing.  However, I think we could revisit that
>> now and utilize a FIFO queue for instance to overcome that.  Just needs a
>> JIRA/analysis to get over the real hump here.
>>
>> Thanks
>>
>>
>> On Fri, Jul 31, 2020 at 8:16 AM Eric Secules <es...@gmail.com> wrote:
>>
>>> Is it possible to surround the merge content processor with a
>>> wait/notify to enforce that only <numBuckets> unique fragment identifiers
>>> are allowed into the merge process at one time? I'd rather have the merge
>>> processor only force buckets out based on time. If there's contention for
>>> buckets I'd rather the incoming flowfiles wait and then only expire
>>> existing buckets after a timeout.
>>>
>>> ---------- Forwarded message ---------
>>> From: Eric Secules <es...@gmail.com>
>>> Date: Thu., Jul. 30, 2020, 6:13 p.m.
>>> Subject: NiFi Merge Content Processor Use Case
>>> To: <de...@nifi.apache.org>
>>> Cc: <gg...@gmail.com>
>>>
>>>
>>> Hello,
>>>
>>> I have a use case for the merge content processor where I have split the
>>> flow in two branches (original flowfile and PDF one branch may or may not
>>> take longer than the other) and I want to rejoin those branches using the
>>> defragment strategy based on the flowfile UUID of the flowfile before the
>>> split to determine whether both branches have successfully completed. I
>>> noticed that as I increased the amount of flowfiles generated into the
>>> system, I got more merge failures because bins were forced to the failure
>>> relationship before they were able to fully defragment. I can increase the
>>> number of buckets, but this is just a workaround because it doesn't solve
>>> the main problem. Is there a design pattern for accurately merging diverted
>>> branches back together that holds up under load and doesn't require me to
>>> guess a magic number for the number of bins?
>>>
>>> Thanks,
>>> Eric
>>>
>>

Re: NiFi Merge Content Processor Use Case

Posted by Eric Secules <es...@gmail.com>.
Hi Joe,

Thanks for your suggestions and analysis!

I was also thinking that the MergeContent processor could also have a wait
relationship so that way we can distinguish between new flowfiles and
flowfiles that didn't fit into a bucket before. Does NiFi have the ability
to distinguish between flowfiles that come from different input
relationships? Making that distinction might help with the "harder" part
you mentioned.

I would also suggest that you update the Property Description on "Maximum
number of Bins" to say that bins can also be routed to failure if the
maximum number of bins are in use and a flow file has arrived that does not
fit in an existing bin. And also add a log message saying that the bin was
forced to close because the maximum number of bins was reached. That would
help users realize that the number of bins is important to set high enough
or else flowfiles will be routed to failure if the system gets too busy.

Eric

On Fri, Jul 31, 2020 at 8:30 AM Joe Witt <jo...@gmail.com> wrote:

> Eric
>
> I believe for now the most appropriate option available is to figure out
> what the maximum set of outstanding in process buckets could be and set
> that.  Hopefully there is enough memory in the system to handle
> whatever that worse case could be in terms of flowfiles attributes being
> held in mem.
>
> What should then be considered is how to really support what you want here
> which is a reasonable cap on the number of buckets but the processor to
> behave differently such that it will only kick out buckets when they
> expire.  MergeConent supports some pretty powerful/complex cases and I
> think this is one that needs to be included potentially.  The 'harder' part
> historically had been that we could keep reading off the incoming flowfile
> queue to see if any bucket in the list could take a flowfile but we'd find
> nothing fits and put it right back in the queue and likely right back on
> top so we'd end up doing nothing.  However, I think we could revisit that
> now and utilize a FIFO queue for instance to overcome that.  Just needs a
> JIRA/analysis to get over the real hump here.
>
> Thanks
>
>
> On Fri, Jul 31, 2020 at 8:16 AM Eric Secules <es...@gmail.com> wrote:
>
>> Is it possible to surround the merge content processor with a wait/notify
>> to enforce that only <numBuckets> unique fragment identifiers are allowed
>> into the merge process at one time? I'd rather have the merge processor
>> only force buckets out based on time. If there's contention for buckets I'd
>> rather the incoming flowfiles wait and then only expire existing buckets
>> after a timeout.
>>
>> ---------- Forwarded message ---------
>> From: Eric Secules <es...@gmail.com>
>> Date: Thu., Jul. 30, 2020, 6:13 p.m.
>> Subject: NiFi Merge Content Processor Use Case
>> To: <de...@nifi.apache.org>
>> Cc: <gg...@gmail.com>
>>
>>
>> Hello,
>>
>> I have a use case for the merge content processor where I have split the
>> flow in two branches (original flowfile and PDF one branch may or may not
>> take longer than the other) and I want to rejoin those branches using the
>> defragment strategy based on the flowfile UUID of the flowfile before the
>> split to determine whether both branches have successfully completed. I
>> noticed that as I increased the amount of flowfiles generated into the
>> system, I got more merge failures because bins were forced to the failure
>> relationship before they were able to fully defragment. I can increase the
>> number of buckets, but this is just a workaround because it doesn't solve
>> the main problem. Is there a design pattern for accurately merging diverted
>> branches back together that holds up under load and doesn't require me to
>> guess a magic number for the number of bins?
>>
>> Thanks,
>> Eric
>>
>

Re: NiFi Merge Content Processor Use Case

Posted by Joe Witt <jo...@gmail.com>.
Eric

I believe for now the most appropriate option available is to figure out
what the maximum set of outstanding in process buckets could be and set
that.  Hopefully there is enough memory in the system to handle
whatever that worse case could be in terms of flowfiles attributes being
held in mem.

What should then be considered is how to really support what you want here
which is a reasonable cap on the number of buckets but the processor to
behave differently such that it will only kick out buckets when they
expire.  MergeConent supports some pretty powerful/complex cases and I
think this is one that needs to be included potentially.  The 'harder' part
historically had been that we could keep reading off the incoming flowfile
queue to see if any bucket in the list could take a flowfile but we'd find
nothing fits and put it right back in the queue and likely right back on
top so we'd end up doing nothing.  However, I think we could revisit that
now and utilize a FIFO queue for instance to overcome that.  Just needs a
JIRA/analysis to get over the real hump here.

Thanks


On Fri, Jul 31, 2020 at 8:16 AM Eric Secules <es...@gmail.com> wrote:

> Is it possible to surround the merge content processor with a wait/notify
> to enforce that only <numBuckets> unique fragment identifiers are allowed
> into the merge process at one time? I'd rather have the merge processor
> only force buckets out based on time. If there's contention for buckets I'd
> rather the incoming flowfiles wait and then only expire existing buckets
> after a timeout.
>
> ---------- Forwarded message ---------
> From: Eric Secules <es...@gmail.com>
> Date: Thu., Jul. 30, 2020, 6:13 p.m.
> Subject: NiFi Merge Content Processor Use Case
> To: <de...@nifi.apache.org>
> Cc: <gg...@gmail.com>
>
>
> Hello,
>
> I have a use case for the merge content processor where I have split the
> flow in two branches (original flowfile and PDF one branch may or may not
> take longer than the other) and I want to rejoin those branches using the
> defragment strategy based on the flowfile UUID of the flowfile before the
> split to determine whether both branches have successfully completed. I
> noticed that as I increased the amount of flowfiles generated into the
> system, I got more merge failures because bins were forced to the failure
> relationship before they were able to fully defragment. I can increase the
> number of buckets, but this is just a workaround because it doesn't solve
> the main problem. Is there a design pattern for accurately merging diverted
> branches back together that holds up under load and doesn't require me to
> guess a magic number for the number of bins?
>
> Thanks,
> Eric
>

NiFi Merge Content Processor Use Case

Posted by Eric Secules <es...@gmail.com>.
Is it possible to surround the merge content processor with a wait/notify
to enforce that only <numBuckets> unique fragment identifiers are allowed
into the merge process at one time? I'd rather have the merge processor
only force buckets out based on time. If there's contention for buckets I'd
rather the incoming flowfiles wait and then only expire existing buckets
after a timeout.

---------- Forwarded message ---------
From: Eric Secules <es...@gmail.com>
Date: Thu., Jul. 30, 2020, 6:13 p.m.
Subject: NiFi Merge Content Processor Use Case
To: <de...@nifi.apache.org>
Cc: <gg...@gmail.com>


Hello,

I have a use case for the merge content processor where I have split the
flow in two branches (original flowfile and PDF one branch may or may not
take longer than the other) and I want to rejoin those branches using the
defragment strategy based on the flowfile UUID of the flowfile before the
split to determine whether both branches have successfully completed. I
noticed that as I increased the amount of flowfiles generated into the
system, I got more merge failures because bins were forced to the failure
relationship before they were able to fully defragment. I can increase the
number of buckets, but this is just a workaround because it doesn't solve
the main problem. Is there a design pattern for accurately merging diverted
branches back together that holds up under load and doesn't require me to
guess a magic number for the number of bins?

Thanks,
Eric

Re: NiFi Merge Content Processor Use Case

Posted by Eric Secules <es...@gmail.com>.
Sorry I should have sent this to users mailing list.



On Thu., Jul. 30, 2020, 6:13 p.m. Eric Secules, <es...@gmail.com> wrote:

> Hello,
>
> I have a use case for the merge content processor where I have split the
> flow in two branches (original flowfile and PDF one branch may or may not
> take longer than the other) and I want to rejoin those branches using the
> defragment strategy based on the flowfile UUID of the flowfile before the
> split to determine whether both branches have successfully completed. I
> noticed that as I increased the amount of flowfiles generated into the
> system, I got more merge failures because bins were forced to the failure
> relationship before they were able to fully defragment. I can increase the
> number of buckets, but this is just a workaround because it doesn't solve
> the main problem. Is there a design pattern for accurately merging diverted
> branches back together that holds up under load and doesn't require me to
> guess a magic number for the number of bins?
>
> Thanks,
> Eric
>