You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Paresh Shah <Pa...@lifelock.com> on 2015/12/11 05:37:15 UTC

Questions about the ordering of the FlowFile.

Here’s my use case.
We have a application protocol between the start and end processors in a data flow, that expect the flow files to arrive in the order they are generated. For e.g

Start Record Flowfile
<many Data Records Flowfiles>
End Record Flowfile.

The first processor does the following.

  1.  Generates and transfers the StartRecord flow file.
  2.  Generates data records and transfers them.
  3.  Generates and transfers the EndRecord flow file

The last processor in the data flow does the following.

      1. Looks for the StartRecord flow file and does its thing.
      2. Looks for the DataRecord flow file and does its thing.
      3.  Looks for the EndRecord flow file and updates and cleanups up the target state.

The first processor is doing multiple transfers on the session object before calling commit.

We see that they are being received in random order. As a result we are not able to execute the app protocol. We have tried the FirstInFirstOutPrioritizer and OldestFlowFilePrioritizer.

We would appreciate any insights into this we can get as it seems to be a blocking issue for us.

Thanks
Paresh
________________________________
The information contained in this transmission may contain privileged and confidential information. It is intended only for the use of the person(s) named above. If you are not the intended recipient, you are hereby notified that any review, dissemination, distribution or duplication of this communication is strictly prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.
________________________________

Re: Questions about the ordering of the FlowFile.

Posted by Mark Payne <ma...@hotmail.com>.
Paresh,

You are right - it appears that there is actually a bug in the prioritizer. It is sorting based on the FlowFile's
"Lineage Start Date". I.e., the timestamp of when the FlowFile's greatest "ancestor" entered the flow. Since
all of the FlowFiles that you are producing have the same parent, they will have the same Lineage Start Date
and therefore will not be ordered properly. I submitted a ticket [1] to address this behavior, so that if two FlowFiles
have the same timestamp, we will do a secondary sort based on the one-up number assigned to the FlowFile.

In the meantime, you could consider the 'priority' attribute, as Brandon is suggesting here, as a work around. 
You could add a 'priority' attribute to put FlowFiles in the order you want and use the PriorityAttributePrioritizer
to sort appropriately.

Thanks
-Mark


[1] https://issues.apache.org/jira/browse/NIFI-1279 <https://issues.apache.org/jira/browse/NIFI-1279>




> On Dec 11, 2015, at 9:27 AM, Brandon DeVries <br...@jhu.edu> wrote:
> 
> Paresh,
> 
>     You might want to look at the PriorityAttributePrioritizer[1]:
> 
> *PriorityAttributePrioritizer*: Given two FlowFiles that both have a
> "priority" attribute, the one that has the highest priority value will be
> prprocessed first. Note that an UpdateAttribute processor should be used to
> add the "priority" attribute to the FlowFiles before they reach a
> connection that has this prioritizer set. Values for the "priority"
> attribute may be alphanumeric, where "a" is a higher priority than "z", and
> "1" is a higher priority than "9", for example.
> 
>     You can set a "priority" attribute in your custom processor.  However,
> I would caution against absolutely relying on in-order delivery. Just
> because a FlowFile begins processing first doesn't mean it will complete
> first (assuming the processor has multiple concurrent tasks).  If it is
> only critical that they be in order for the last processor, you might also
> consider the MergeContent processor in "Defragment" mode.  Similar to the
> "priority" attribute, you would set a "fragment.identifier" common to all
> of the FlowFiles comprising a record, and then a "fragment.index" for each
> FlowFile in the record.  At the end of the flow, you could then create a
> single FlowFile comprised of all of the pieces of the record, in order.
> Alternatively, you could extend the same class as MergeContent
> (BinFiles[3]) in your last processor to ensure that all files are received
> in order before beginning the final step.  Hope this helps
> 
> [1]
> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#connecting-components
> [2]
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/index.html
> [3]
> https://github.com/apache/nifi/blob/31fba6b3332978ca2f6a1d693f6053d719fb9daa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
> 
> 
> Brandon
> 
> 
> On Thu, Dec 10, 2015 at 11:37 PM, Paresh Shah <Pa...@lifelock.com>
> wrote:
> 
>> Here’s my use case.
>> We have a application protocol between the start and end processors in a
>> data flow, that expect the flow files to arrive in the order they are
>> generated. For e.g
>> 
>> Start Record Flowfile
>> <many Data Records Flowfiles>
>> End Record Flowfile.
>> 
>> The first processor does the following.
>> 
>>  1.  Generates and transfers the StartRecord flow file.
>>  2.  Generates data records and transfers them.
>>  3.  Generates and transfers the EndRecord flow file
>> 
>> The last processor in the data flow does the following.
>> 
>>      1. Looks for the StartRecord flow file and does its thing.
>>      2. Looks for the DataRecord flow file and does its thing.
>>      3.  Looks for the EndRecord flow file and updates and cleanups up
>> the target state.
>> 
>> The first processor is doing multiple transfers on the session object
>> before calling commit.
>> 
>> We see that they are being received in random order. As a result we are
>> not able to execute the app protocol. We have tried the
>> FirstInFirstOutPrioritizer and OldestFlowFilePrioritizer.
>> 
>> We would appreciate any insights into this we can get as it seems to be a
>> blocking issue for us.
>> 
>> Thanks
>> Paresh
>> ________________________________
>> The information contained in this transmission may contain privileged and
>> confidential information. It is intended only for the use of the person(s)
>> named above. If you are not the intended recipient, you are hereby notified
>> that any review, dissemination, distribution or duplication of this
>> communication is strictly prohibited. If you are not the intended
>> recipient, please contact the sender by reply email and destroy all copies
>> of the original message.
>> ________________________________
>> 


Re: Questions about the ordering of the FlowFile.

Posted by Brandon DeVries <br...@jhu.edu>.
Paresh,

     You might want to look at the PriorityAttributePrioritizer[1]:

*PriorityAttributePrioritizer*: Given two FlowFiles that both have a
"priority" attribute, the one that has the highest priority value will be
prprocessed first. Note that an UpdateAttribute processor should be used to
add the "priority" attribute to the FlowFiles before they reach a
connection that has this prioritizer set. Values for the "priority"
attribute may be alphanumeric, where "a" is a higher priority than "z", and
"1" is a higher priority than "9", for example.

     You can set a "priority" attribute in your custom processor.  However,
I would caution against absolutely relying on in-order delivery. Just
because a FlowFile begins processing first doesn't mean it will complete
first (assuming the processor has multiple concurrent tasks).  If it is
only critical that they be in order for the last processor, you might also
consider the MergeContent processor in "Defragment" mode.  Similar to the
"priority" attribute, you would set a "fragment.identifier" common to all
of the FlowFiles comprising a record, and then a "fragment.index" for each
FlowFile in the record.  At the end of the flow, you could then create a
single FlowFile comprised of all of the pieces of the record, in order.
Alternatively, you could extend the same class as MergeContent
(BinFiles[3]) in your last processor to ensure that all files are received
in order before beginning the final step.  Hope this helps

[1]
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#connecting-components
[2]
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/index.html
[3]
https://github.com/apache/nifi/blob/31fba6b3332978ca2f6a1d693f6053d719fb9daa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java


Brandon


On Thu, Dec 10, 2015 at 11:37 PM, Paresh Shah <Pa...@lifelock.com>
wrote:

> Here’s my use case.
> We have a application protocol between the start and end processors in a
> data flow, that expect the flow files to arrive in the order they are
> generated. For e.g
>
> Start Record Flowfile
> <many Data Records Flowfiles>
> End Record Flowfile.
>
> The first processor does the following.
>
>   1.  Generates and transfers the StartRecord flow file.
>   2.  Generates data records and transfers them.
>   3.  Generates and transfers the EndRecord flow file
>
> The last processor in the data flow does the following.
>
>       1. Looks for the StartRecord flow file and does its thing.
>       2. Looks for the DataRecord flow file and does its thing.
>       3.  Looks for the EndRecord flow file and updates and cleanups up
> the target state.
>
> The first processor is doing multiple transfers on the session object
> before calling commit.
>
> We see that they are being received in random order. As a result we are
> not able to execute the app protocol. We have tried the
> FirstInFirstOutPrioritizer and OldestFlowFilePrioritizer.
>
> We would appreciate any insights into this we can get as it seems to be a
> blocking issue for us.
>
> Thanks
> Paresh
> ________________________________
> The information contained in this transmission may contain privileged and
> confidential information. It is intended only for the use of the person(s)
> named above. If you are not the intended recipient, you are hereby notified
> that any review, dissemination, distribution or duplication of this
> communication is strictly prohibited. If you are not the intended
> recipient, please contact the sender by reply email and destroy all copies
> of the original message.
> ________________________________
>