You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Chris Lundeberg <cl...@1904labs.com> on 2019/08/19 16:08:40 UTC

Wait/Notify Question

Hi all,

I wanted to throw out a question to the larger community before I went down
a different path.  I might be looking at this wrong or making assumptions I
shouldn't.

Recently I started working with the Wait and Notify processors a bit more.
I have a new flow which is a bit more batch in nature and these processors
seem to work nicely for being able to intelligently wait for chunks or
files to be processed, before moving on to the next step.  I have one
specific pattern that I haven't solved with the inbuilt functionality,
which is:

1. I have an incoming zip file from SFTP.  That zip contains n-number of
files within and each of those files need to be split in some way.  I won't
know the number of files within the zip.

2.  After they have been split correctly, a few transformations run on each
of the files.

3.  At the end of the transformation process, these various files will be
merged into 5 specific outbound file formats, to be sent to an outbound
SFTP server.  *Note*: I am not splitting and merging the same files back
together (I have looked at the fragment index stuff).

I found a nice solution for being able to count the number of flowfiles
after the split, so I know exactly how many files should be transformed and
thus I know what my "Target Signal Count" should be within the Wait
processor.  At the moment I have a counting process to (1) Fetch
Distributed MapCache, (2) Replace text (incrementing the count number from
the fetch, if a number is found), and (3) Put Distributed MapCache.  This
process works as expected and I have a valid key/value pair in the MapCache
for that particular process (I create a BachID so its very specific for
each pull from the SFTP processor).  The only way I know how to
intelligently provide that information back to the Wait processor is to
pull that value with a Fetch Distributed MapCache right before the flowfile
enters the Wait processor.  In theory each flowfile waiting would have the
same attribute from the Fetch process and each attribute would be the same
count.  However this doesn't always work because there could exist a
condition where the transformations happen before the counting has been
done and published to the MapCache Sever.  So in this scenario you end up
with some flowfiles having a lower count than others or just not having the
"true" count.  Now, I can put additional gates in place such as trying to
slow down the flowfiles at specific sections to try and allow the counting
to be done first, but its not a perfect science.

I thought ideally it would be good to allow the Wait processor to pull
directly from the MapCache if I could provide the key it would need for a
lookup, within the "Target Signal Count" field.  It could use the signal
coming from Notify to say "I have X number of Notify, for this signal" and
use the count value I have set in the MapCache to say "This is the total
number of files I need to see from Notify, for that same signal". This way,
I could run the Wait processor every few seconds and the chances of running
into a miscount condition would be far less.  Is there any way currently
where this processor could pull directly from the cache, or does it have to
rely on an attribute within the flowfile itself?  I think it's the latter,
but I want to make sure someone doesn't have a better idea.

Sorry for the long message. Thanks!


Chris Lundeberg

Re: Wait/Notify Question

Posted by Chris Lundeberg <cl...@1904labs.com>.
Thanks for the feedback, Koji - I appreciate it.  I will submit a Jira for
the below and implement an alternate solution for the potential race
condition.

Thanks!

Chris Lundeberg




On Mon, Aug 19, 2019 at 8:16 PM Koji Kawamura <ij...@gmail.com>
wrote:

> Hi Chris,
>
> You are correct, Wait processor has to rely on an attribute within a
> FlowFile to determine target signal count.
> I think the idea of making Wait be able to fetch target signal count
> from DistributedMapCache is a nice improvement.
>
> Please create a JIRA for further discussion. I guess we will need to
> add a property such as "Fetch Target Signal Count from Cache Service",
> boolean, defaults to false. If enabled, Wait processor treats the
> configured "Target Signal Count" value as a key in the
> DistributedMapCache, then fetch the value to use as a target count. In
> case the key is not found, the Wait processor transfer the FlowFile to
> wait relationship.
> https://issues.apache.org/jira/projects/NIFI
>
> Adding FetchDistributedMapCache right before Wait provides the same
> result. But if Wait processor can fetch it, we can reduce the number
> of fetch operation required to process multiple FlowFiles at Wait.
>
> To avoid the race condition that Wait processes FlowFiles before the
> counting part finishes, I'd use two keys at the counting part.
> Temporary one to accumulate the count, and the final one (the signal
> identifier), once the counting finished.
>
> Thanks,
> Koji
>
> On Tue, Aug 20, 2019 at 1:08 AM Chris Lundeberg <cl...@1904labs.com>
> wrote:
> >
> > Hi all,
> >
> > I wanted to throw out a question to the larger community before I went
> down
> > a different path.  I might be looking at this wrong or making
> assumptions I
> > shouldn't.
> >
> > Recently I started working with the Wait and Notify processors a bit
> more.
> > I have a new flow which is a bit more batch in nature and these
> processors
> > seem to work nicely for being able to intelligently wait for chunks or
> > files to be processed, before moving on to the next step.  I have one
> > specific pattern that I haven't solved with the inbuilt functionality,
> > which is:
> >
> > 1. I have an incoming zip file from SFTP.  That zip contains n-number of
> > files within and each of those files need to be split in some way.  I
> won't
> > know the number of files within the zip.
> >
> > 2.  After they have been split correctly, a few transformations run on
> each
> > of the files.
> >
> > 3.  At the end of the transformation process, these various files will be
> > merged into 5 specific outbound file formats, to be sent to an outbound
> > SFTP server.  *Note*: I am not splitting and merging the same files back
> > together (I have looked at the fragment index stuff).
> >
> > I found a nice solution for being able to count the number of flowfiles
> > after the split, so I know exactly how many files should be transformed
> and
> > thus I know what my "Target Signal Count" should be within the Wait
> > processor.  At the moment I have a counting process to (1) Fetch
> > Distributed MapCache, (2) Replace text (incrementing the count number
> from
> > the fetch, if a number is found), and (3) Put Distributed MapCache.  This
> > process works as expected and I have a valid key/value pair in the
> MapCache
> > for that particular process (I create a BachID so its very specific for
> > each pull from the SFTP processor).  The only way I know how to
> > intelligently provide that information back to the Wait processor is to
> > pull that value with a Fetch Distributed MapCache right before the
> flowfile
> > enters the Wait processor.  In theory each flowfile waiting would have
> the
> > same attribute from the Fetch process and each attribute would be the
> same
> > count.  However this doesn't always work because there could exist a
> > condition where the transformations happen before the counting has been
> > done and published to the MapCache Sever.  So in this scenario you end up
> > with some flowfiles having a lower count than others or just not having
> the
> > "true" count.  Now, I can put additional gates in place such as trying to
> > slow down the flowfiles at specific sections to try and allow the
> counting
> > to be done first, but its not a perfect science.
> >
> > I thought ideally it would be good to allow the Wait processor to pull
> > directly from the MapCache if I could provide the key it would need for a
> > lookup, within the "Target Signal Count" field.  It could use the signal
> > coming from Notify to say "I have X number of Notify, for this signal"
> and
> > use the count value I have set in the MapCache to say "This is the total
> > number of files I need to see from Notify, for that same signal". This
> way,
> > I could run the Wait processor every few seconds and the chances of
> running
> > into a miscount condition would be far less.  Is there any way currently
> > where this processor could pull directly from the cache, or does it have
> to
> > rely on an attribute within the flowfile itself?  I think it's the
> latter,
> > but I want to make sure someone doesn't have a better idea.
> >
> > Sorry for the long message. Thanks!
> >
> >
> > Chris Lundeberg
>

Re: Wait/Notify Question

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Chris,

You are correct, Wait processor has to rely on an attribute within a
FlowFile to determine target signal count.
I think the idea of making Wait be able to fetch target signal count
from DistributedMapCache is a nice improvement.

Please create a JIRA for further discussion. I guess we will need to
add a property such as "Fetch Target Signal Count from Cache Service",
boolean, defaults to false. If enabled, Wait processor treats the
configured "Target Signal Count" value as a key in the
DistributedMapCache, then fetch the value to use as a target count. In
case the key is not found, the Wait processor transfer the FlowFile to
wait relationship.
https://issues.apache.org/jira/projects/NIFI

Adding FetchDistributedMapCache right before Wait provides the same
result. But if Wait processor can fetch it, we can reduce the number
of fetch operation required to process multiple FlowFiles at Wait.

To avoid the race condition that Wait processes FlowFiles before the
counting part finishes, I'd use two keys at the counting part.
Temporary one to accumulate the count, and the final one (the signal
identifier), once the counting finished.

Thanks,
Koji

On Tue, Aug 20, 2019 at 1:08 AM Chris Lundeberg <cl...@1904labs.com> wrote:
>
> Hi all,
>
> I wanted to throw out a question to the larger community before I went down
> a different path.  I might be looking at this wrong or making assumptions I
> shouldn't.
>
> Recently I started working with the Wait and Notify processors a bit more.
> I have a new flow which is a bit more batch in nature and these processors
> seem to work nicely for being able to intelligently wait for chunks or
> files to be processed, before moving on to the next step.  I have one
> specific pattern that I haven't solved with the inbuilt functionality,
> which is:
>
> 1. I have an incoming zip file from SFTP.  That zip contains n-number of
> files within and each of those files need to be split in some way.  I won't
> know the number of files within the zip.
>
> 2.  After they have been split correctly, a few transformations run on each
> of the files.
>
> 3.  At the end of the transformation process, these various files will be
> merged into 5 specific outbound file formats, to be sent to an outbound
> SFTP server.  *Note*: I am not splitting and merging the same files back
> together (I have looked at the fragment index stuff).
>
> I found a nice solution for being able to count the number of flowfiles
> after the split, so I know exactly how many files should be transformed and
> thus I know what my "Target Signal Count" should be within the Wait
> processor.  At the moment I have a counting process to (1) Fetch
> Distributed MapCache, (2) Replace text (incrementing the count number from
> the fetch, if a number is found), and (3) Put Distributed MapCache.  This
> process works as expected and I have a valid key/value pair in the MapCache
> for that particular process (I create a BachID so its very specific for
> each pull from the SFTP processor).  The only way I know how to
> intelligently provide that information back to the Wait processor is to
> pull that value with a Fetch Distributed MapCache right before the flowfile
> enters the Wait processor.  In theory each flowfile waiting would have the
> same attribute from the Fetch process and each attribute would be the same
> count.  However this doesn't always work because there could exist a
> condition where the transformations happen before the counting has been
> done and published to the MapCache Sever.  So in this scenario you end up
> with some flowfiles having a lower count than others or just not having the
> "true" count.  Now, I can put additional gates in place such as trying to
> slow down the flowfiles at specific sections to try and allow the counting
> to be done first, but its not a perfect science.
>
> I thought ideally it would be good to allow the Wait processor to pull
> directly from the MapCache if I could provide the key it would need for a
> lookup, within the "Target Signal Count" field.  It could use the signal
> coming from Notify to say "I have X number of Notify, for this signal" and
> use the count value I have set in the MapCache to say "This is the total
> number of files I need to see from Notify, for that same signal". This way,
> I could run the Wait processor every few seconds and the chances of running
> into a miscount condition would be far less.  Is there any way currently
> where this processor could pull directly from the cache, or does it have to
> rely on an attribute within the flowfile itself?  I think it's the latter,
> but I want to make sure someone doesn't have a better idea.
>
> Sorry for the long message. Thanks!
>
>
> Chris Lundeberg