You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by 尹文才 <ba...@gmail.com> on 2017/07/10 07:43:35 UTC

FlowFile position when transferred to Relationship.SELF

Hi guys, I have written a customized processor whose functionality is
similar to the NIFI's Wait processor, the difference is my processor needs
to wait a batch of data and when the batch end flag is found, it will
transfer the batch of data to destinations.

I checked the source code of Wait processor and also transferred the
flowfiles to Relationship.SELF which is the incoming queue when the batch
of data is not yet complete. The problem I found was sometimes I could see
the sequence of the FlowFiles transferred from my processor to destinations
were not in order.
I then added sequence attribute(number starting from 1) to all FlowFiles
coming into my processor and I could verify that this problem happen from
time to time, but I couldn't find the stable way to reproduce it.

My question is how does NIFI handle the FlowFile when it's being
transferred to Relationship.SELF, does it put back to its original position
in the incoming queue? Thanks.

Regards,
Ben

Re: FlowFile position when transferred to Relationship.SELF

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Ben,

FIFO prioritizer uses lastQueueDate, and when a FlowFile is
transferred to SELF, lastQueueDate will be updated, so I think FIFO
may not work as you expect.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java#L34

Probably OldestFlowFileFirstPrioritizer is more appropriate for your case.
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java

Thanks,
Koji

On Mon, Jul 10, 2017 at 10:38 PM, 尹文才 <ba...@gmail.com> wrote:
> Hi Koji, thanks for the explanation, I checked the NIFI documentation you
> provided, do you mean I should use the FIFO prioritizer in my case? Because
> as you mentioned the FlowFiles would be put back into their original
> positions, so as I  understand using FIFO should make the FlowFiles in
> consistent order.
>
> Regards,
> Ben
>
> 2017-07-10 17:06 GMT+08:00 Koji Kawamura <ij...@gmail.com>:
>
>> Hi,
>>
>> I think it puts back a FlowFile to its original position but update
>> queued date as implemented here:
>> https://github.com/apache/nifi/blob/master/nifi-nar-
>> bundles/nifi-framework-bundle/nifi-framework/nifi-framework-
>> core/src/main/java/org/apache/nifi/controller/repository/
>> StandardProcessSession.java#L1851
>>
>> In order to pull FlowFiles from a queue in consistent order, you need
>> to specify a prioritizer.
>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#prioritization
>>
>> I'm just curious about the functionality you added. Wait processor has
>> 'Releasable FlowFile Count' and it could be used to make a batch of
>> FlowFiles wait and go. Or Notify's 'Signal Counter Delta' could be
>> useful, too.
>>
>> Regards,
>> Koji
>>
>> On Mon, Jul 10, 2017 at 4:43 PM, 尹文才 <ba...@gmail.com> wrote:
>> > Hi guys, I have written a customized processor whose functionality is
>> > similar to the NIFI's Wait processor, the difference is my processor
>> needs
>> > to wait a batch of data and when the batch end flag is found, it will
>> > transfer the batch of data to destinations.
>> >
>> > I checked the source code of Wait processor and also transferred the
>> > flowfiles to Relationship.SELF which is the incoming queue when the batch
>> > of data is not yet complete. The problem I found was sometimes I could
>> see
>> > the sequence of the FlowFiles transferred from my processor to
>> destinations
>> > were not in order.
>> > I then added sequence attribute(number starting from 1) to all FlowFiles
>> > coming into my processor and I could verify that this problem happen from
>> > time to time, but I couldn't find the stable way to reproduce it.
>> >
>> > My question is how does NIFI handle the FlowFile when it's being
>> > transferred to Relationship.SELF, does it put back to its original
>> position
>> > in the incoming queue? Thanks.
>> >
>> > Regards,
>> > Ben
>>

Re: FlowFile position when transferred to Relationship.SELF

Posted by 尹文才 <ba...@gmail.com>.
Hi Koji, thanks for the explanation, I checked the NIFI documentation you
provided, do you mean I should use the FIFO prioritizer in my case? Because
as you mentioned the FlowFiles would be put back into their original
positions, so as I  understand using FIFO should make the FlowFiles in
consistent order.

Regards,
Ben

2017-07-10 17:06 GMT+08:00 Koji Kawamura <ij...@gmail.com>:

> Hi,
>
> I think it puts back a FlowFile to its original position but update
> queued date as implemented here:
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-framework-bundle/nifi-framework/nifi-framework-
> core/src/main/java/org/apache/nifi/controller/repository/
> StandardProcessSession.java#L1851
>
> In order to pull FlowFiles from a queue in consistent order, you need
> to specify a prioritizer.
> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#prioritization
>
> I'm just curious about the functionality you added. Wait processor has
> 'Releasable FlowFile Count' and it could be used to make a batch of
> FlowFiles wait and go. Or Notify's 'Signal Counter Delta' could be
> useful, too.
>
> Regards,
> Koji
>
> On Mon, Jul 10, 2017 at 4:43 PM, 尹文才 <ba...@gmail.com> wrote:
> > Hi guys, I have written a customized processor whose functionality is
> > similar to the NIFI's Wait processor, the difference is my processor
> needs
> > to wait a batch of data and when the batch end flag is found, it will
> > transfer the batch of data to destinations.
> >
> > I checked the source code of Wait processor and also transferred the
> > flowfiles to Relationship.SELF which is the incoming queue when the batch
> > of data is not yet complete. The problem I found was sometimes I could
> see
> > the sequence of the FlowFiles transferred from my processor to
> destinations
> > were not in order.
> > I then added sequence attribute(number starting from 1) to all FlowFiles
> > coming into my processor and I could verify that this problem happen from
> > time to time, but I couldn't find the stable way to reproduce it.
> >
> > My question is how does NIFI handle the FlowFile when it's being
> > transferred to Relationship.SELF, does it put back to its original
> position
> > in the incoming queue? Thanks.
> >
> > Regards,
> > Ben
>

Re: FlowFile position when transferred to Relationship.SELF

Posted by Koji Kawamura <ij...@gmail.com>.
Hi,

I think it puts back a FlowFile to its original position but update
queued date as implemented here:
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1851

In order to pull FlowFiles from a queue in consistent order, you need
to specify a prioritizer.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#prioritization

I'm just curious about the functionality you added. Wait processor has
'Releasable FlowFile Count' and it could be used to make a batch of
FlowFiles wait and go. Or Notify's 'Signal Counter Delta' could be
useful, too.

Regards,
Koji

On Mon, Jul 10, 2017 at 4:43 PM, 尹文才 <ba...@gmail.com> wrote:
> Hi guys, I have written a customized processor whose functionality is
> similar to the NIFI's Wait processor, the difference is my processor needs
> to wait a batch of data and when the batch end flag is found, it will
> transfer the batch of data to destinations.
>
> I checked the source code of Wait processor and also transferred the
> flowfiles to Relationship.SELF which is the incoming queue when the batch
> of data is not yet complete. The problem I found was sometimes I could see
> the sequence of the FlowFiles transferred from my processor to destinations
> were not in order.
> I then added sequence attribute(number starting from 1) to all FlowFiles
> coming into my processor and I could verify that this problem happen from
> time to time, but I couldn't find the stable way to reproduce it.
>
> My question is how does NIFI handle the FlowFile when it's being
> transferred to Relationship.SELF, does it put back to its original position
> in the incoming queue? Thanks.
>
> Regards,
> Ben