You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Van Autreve Dries <dr...@vlaanderen.be> on 2021/04/01 07:35:58 UTC

Strict order of flow files in a cluster

Hello all

We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, … 
It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible.  

We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

-- 
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)


Re: Strict order of flow files in a cluster

Posted by Van Autreve Dries <dr...@vlaanderen.be>.
Mark, thanks for the detailed information. It’s clear to us now. Boris, also thanks for your input.

--
Kind Regards
Dries Van Autreve


From: Mark Payne <ma...@hotmail.com>
Reply to: "users@nifi.apache.org" <us...@nifi.apache.org>
Date: Thursday, 1 April 2021 at 18:11
To: "users@nifi.apache.org" <us...@nifi.apache.org>
Subject: Re: Strict order of flow files in a cluster

Boris,

To be clear, the options to rollback on failure instead of routing to a failure relationship were added specifically for this use case of consuming CDC events (typically from Kafka). But they were only recently added, in either 1.12 or 1.13. That should make things simpler.

Thanks
-Mark


On Apr 1, 2021, at 11:42 AM, Boris Tyukin <bo...@boristyukin.com>> wrote:

our use was exactly that - CDC events that we had to apply in the order. FirstInFirstOutPrioritizer was not enough because some flowfiles will fail but Nifi will process next one. All examples I've seen had all kind of issues but mostly enforcing order. So our only choice was our custom FIFO processor. We basically register new flowfile from Kafka consumer in FIFO queue which is stored in mysql db and only start newer flowfiles if previous one was successful.

We still use FirstInFirstOutPrioritizer between processors and also backpressure feature was extremely helpful. We love how Nifi gives us all the tools to troubleshoot things if they go wrong - our other option was custom Kafka consumer or Kafka connect but it all seemed too complicated and would not give us monitoring/logging capabilities like we have with NiFi.

On Thu, Apr 1, 2021 at 10:07 AM Mark Payne <ma...@hotmail.com>> wrote:
Dries,

The short answer is that, depending on your source, destination, and what you’re doing in between, it is sometimes (but not always) possible. But not particularly simple.

The longer version:

NiFi doesn’t strive to provide strict ordering guarantees. Rather, it strives to provide data prioritization. Consider a use case where NiFi is reading a temperature sensor on an oil rig with poor comms. Comms go out and 30 minutes later, they come back. If there’s a fire, we don’t want the 1,000 readings that have been taking during that 30 minutes - if there’s one that says there’s a fire, we want that one first. So this is achieved using FlowFile Prioritizers.

One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data in a given queue is processed in the order that it arrived in the queue. So, for a strictly linear flow (i.e., a flow that goes from Processor A to B to C, without any routing/decision making) then this works as a strict ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’ relationship then you can have the data get out of order.

However, a common flow that we do see is to have Debezium (or something similar) monitoring a database for changes, and publish CDC events to Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 -> (possibly UpdateRecord/LookupRecord, etc. to perform Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In this flow, all connections use the FIFO Prioritizer. And it ensure that PublishDatabaseRecord / PutKudu is configured in a  way that it won’t route to failure - instead, if there’s a failure, it rolls back the session. Now, this handles the concern of ordering once the data is on the node, but the data must also arrive in the correct order from Kafka. So, for this case, you must also pin specific Kafka partitions to specific nifi nodes, which can be done by adding user-defined properties, as described in the documentation.

Thanks
-Mark


On Apr 1, 2021, at 8:10 AM, Boris Tyukin <bo...@boristyukin.com>> wrote:

We ended up building a simple groovy processor that will use mysql db to queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue until we address an issue with flowfile A. We also used back pressure feature to slow down upstream Kafka consumers.

After playing with wait/notify we found it extremely difficult and cumbersome. Enforce order was not really doing much for us as well. Our use case was to process kafka messages on 3 node nifi cluster in order.

It worked really well in the end for us

On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <dr...@vlaanderen.be>> wrote:
Hello all

We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, …
It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible.

We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

--
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)



Re: Strict order of flow files in a cluster

Posted by Boris Tyukin <bo...@boristyukin.com>.
thanks Mark, very nice feature indeed, but we cannot upgrade
anymore because of new ZK version requirement. but it is another story and
issue specific to us (have to stay with CDH 6.2)

On Thu, Apr 1, 2021 at 12:11 PM Mark Payne <ma...@hotmail.com> wrote:

> Boris,
>
> To be clear, the options to rollback on failure instead of routing to a
> failure relationship were added specifically for this use case of consuming
> CDC events (typically from Kafka). But they were only recently added, in
> either 1.12 or 1.13. That should make things simpler.
>
> Thanks
> -Mark
>
> On Apr 1, 2021, at 11:42 AM, Boris Tyukin <bo...@boristyukin.com> wrote:
>
> our use was exactly that - CDC events that we had to apply in the order.
> FirstInFirstOutPrioritizer was not enough because some flowfiles will fail
> but Nifi will process next one. All examples I've seen had all kind of
> issues but mostly enforcing order. So our only choice was our custom FIFO
> processor. We basically register new flowfile from Kafka consumer in FIFO
> queue which is stored in mysql db and only start newer flowfiles if
> previous one was successful.
>
> We still use FirstInFirstOutPrioritizer between processors and also
> backpressure feature was extremely helpful. We love how Nifi gives us all
> the tools to troubleshoot things if they go wrong - our other option was
> custom Kafka consumer or Kafka connect but it all seemed too complicated
> and would not give us monitoring/logging capabilities like we have with
> NiFi.
>
> On Thu, Apr 1, 2021 at 10:07 AM Mark Payne <ma...@hotmail.com> wrote:
>
>> Dries,
>>
>> The short answer is that, depending on your source, destination, and what
>> you’re doing in between, it is sometimes (but not always) possible. But not
>> particularly simple.
>>
>> The longer version:
>>
>> NiFi doesn’t strive to provide strict ordering guarantees. Rather, it
>> strives to provide data prioritization. Consider a use case where NiFi is
>> reading a temperature sensor on an oil rig with poor comms. Comms go out
>> and 30 minutes later, they come back. If there’s a fire, we don’t want the
>> 1,000 readings that have been taking during that 30 minutes - if there’s
>> one that says there’s a fire, we want that one first. So this is achieved
>> using FlowFile Prioritizers.
>>
>> One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data
>> in a given queue is processed in the order that it arrived in the queue.
>> So, for a strictly linear flow (i.e., a flow that goes from Processor A to
>> B to C, without any routing/decision making) then this works as a strict
>> ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’
>> relationship then you can have the data get out of order.
>>
>> However, a common flow that we do see is to have Debezium (or something
>> similar) monitoring a database for changes, and publish CDC events to
>> Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 ->
>> (possibly UpdateRecord/LookupRecord, etc. to perform
>> Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In
>> this flow, all connections use the FIFO Prioritizer. And it ensure that
>> PublishDatabaseRecord / PutKudu is configured in a  way that it won’t route
>> to failure - instead, if there’s a failure, it rolls back the session. Now,
>> this handles the concern of ordering once the data is on the node, but the
>> data must also arrive in the correct order from Kafka. So, for this case,
>> you must also pin specific Kafka partitions to specific nifi nodes, which
>> can be done by adding user-defined properties, as described in the
>> documentation.
>>
>> Thanks
>> -Mark
>>
>> On Apr 1, 2021, at 8:10 AM, Boris Tyukin <bo...@boristyukin.com> wrote:
>>
>> We ended up building a simple groovy processor that will use mysql db to
>> queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue
>> until we address an issue with flowfile A. We also used back pressure
>> feature to slow down upstream Kafka consumers.
>>
>> After playing with wait/notify we found it extremely difficult and
>> cumbersome. Enforce order was not really doing much for us as well. Our use
>> case was to process kafka messages on 3 node nifi cluster in order.
>>
>> It worked really well in the end for us
>>
>> On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <
>> dries.vanautreve@vlaanderen.be> wrote:
>>
>>> Hello all
>>>
>>> We recently started using NiFi and we were wondering if strict order of
>>> processing flow files in a cluster could be guaranteed by NiFi.
>>>
>>> One of the use cases is as following: messages arrive in a specific
>>> order, go through a simple flow with some basic transformations and are
>>> written to the destination (usually a relational database). The source of
>>> the messages can be a database, Kafka queue, …
>>> It’s important that messages are written to the destination in exactly
>>> the same order they arrived at NiFi. The reason is that messages could be
>>> deltas and we do not want to overwrite newer data with older deltas.
>>> Moreover we do not always control the message format, hence controlling
>>> this from the messaging protocol point of view might not be possible.
>>>
>>> We did some research in various places but have not found a satisfying
>>> answer. Our own investigations have revealed that:
>>> - Just running the first processor on the primary node is not enough
>>> even with a load balancing strategy “single node”. While testing with
>>> stopping / starting the primary node we had some situations were messages
>>> got out of order.
>>> - Using the EnforceOrder processor with high timeouts prevented the
>>> messages getting processed out of order, but each time the primary node
>>> changes, manual intervention is required to reconfigure the initial order
>>> property. Moreover it requires that the source system or first processor
>>> provides this incrementing sequence attribute.
>>>
>>> It seems also not possible to pinpoint a flow to a specific node. At
>>> least we have not found this option. We do understand that this would
>>> affect scalability and availability or failover, but might be acceptable
>>> for those specific cases.
>>>
>>> If there are other options we can explore, any input would be helpful.
>>> Or if it’s not (easily) possible with NiFi on its own, it would be good
>>> to know!
>>>
>>> --
>>> Kind Regards
>>> Dries Van Autreve
>>>
>>>
>>> (Sorry if this will result in a double post. I was not yet subscribed
>>> when I did the first post and my message does not seem to appear in the
>>> list...)
>>>
>>>
>>
>

Re: Strict order of flow files in a cluster

Posted by Mark Payne <ma...@hotmail.com>.
Boris,

To be clear, the options to rollback on failure instead of routing to a failure relationship were added specifically for this use case of consuming CDC events (typically from Kafka). But they were only recently added, in either 1.12 or 1.13. That should make things simpler.

Thanks
-Mark

On Apr 1, 2021, at 11:42 AM, Boris Tyukin <bo...@boristyukin.com>> wrote:

our use was exactly that - CDC events that we had to apply in the order. FirstInFirstOutPrioritizer was not enough because some flowfiles will fail but Nifi will process next one. All examples I've seen had all kind of issues but mostly enforcing order. So our only choice was our custom FIFO processor. We basically register new flowfile from Kafka consumer in FIFO queue which is stored in mysql db and only start newer flowfiles if previous one was successful.

We still use FirstInFirstOutPrioritizer between processors and also backpressure feature was extremely helpful. We love how Nifi gives us all the tools to troubleshoot things if they go wrong - our other option was custom Kafka consumer or Kafka connect but it all seemed too complicated and would not give us monitoring/logging capabilities like we have with NiFi.

On Thu, Apr 1, 2021 at 10:07 AM Mark Payne <ma...@hotmail.com>> wrote:
Dries,

The short answer is that, depending on your source, destination, and what you’re doing in between, it is sometimes (but not always) possible. But not particularly simple.

The longer version:

NiFi doesn’t strive to provide strict ordering guarantees. Rather, it strives to provide data prioritization. Consider a use case where NiFi is reading a temperature sensor on an oil rig with poor comms. Comms go out and 30 minutes later, they come back. If there’s a fire, we don’t want the 1,000 readings that have been taking during that 30 minutes - if there’s one that says there’s a fire, we want that one first. So this is achieved using FlowFile Prioritizers.

One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data in a given queue is processed in the order that it arrived in the queue. So, for a strictly linear flow (i.e., a flow that goes from Processor A to B to C, without any routing/decision making) then this works as a strict ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’ relationship then you can have the data get out of order.

However, a common flow that we do see is to have Debezium (or something similar) monitoring a database for changes, and publish CDC events to Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 -> (possibly UpdateRecord/LookupRecord, etc. to perform Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In this flow, all connections use the FIFO Prioritizer. And it ensure that PublishDatabaseRecord / PutKudu is configured in a  way that it won’t route to failure - instead, if there’s a failure, it rolls back the session. Now, this handles the concern of ordering once the data is on the node, but the data must also arrive in the correct order from Kafka. So, for this case, you must also pin specific Kafka partitions to specific nifi nodes, which can be done by adding user-defined properties, as described in the documentation.

Thanks
-Mark

On Apr 1, 2021, at 8:10 AM, Boris Tyukin <bo...@boristyukin.com>> wrote:

We ended up building a simple groovy processor that will use mysql db to queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue until we address an issue with flowfile A. We also used back pressure feature to slow down upstream Kafka consumers.

After playing with wait/notify we found it extremely difficult and cumbersome. Enforce order was not really doing much for us as well. Our use case was to process kafka messages on 3 node nifi cluster in order.

It worked really well in the end for us

On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <dr...@vlaanderen.be>> wrote:
Hello all

We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, …
It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible.

We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

--
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)




Re: Strict order of flow files in a cluster

Posted by Boris Tyukin <bo...@boristyukin.com>.
our use was exactly that - CDC events that we had to apply in the order.
FirstInFirstOutPrioritizer was not enough because some flowfiles will fail
but Nifi will process next one. All examples I've seen had all kind of
issues but mostly enforcing order. So our only choice was our custom FIFO
processor. We basically register new flowfile from Kafka consumer in FIFO
queue which is stored in mysql db and only start newer flowfiles if
previous one was successful.

We still use FirstInFirstOutPrioritizer between processors and also
backpressure feature was extremely helpful. We love how Nifi gives us all
the tools to troubleshoot things if they go wrong - our other option was
custom Kafka consumer or Kafka connect but it all seemed too complicated
and would not give us monitoring/logging capabilities like we have with
NiFi.

On Thu, Apr 1, 2021 at 10:07 AM Mark Payne <ma...@hotmail.com> wrote:

> Dries,
>
> The short answer is that, depending on your source, destination, and what
> you’re doing in between, it is sometimes (but not always) possible. But not
> particularly simple.
>
> The longer version:
>
> NiFi doesn’t strive to provide strict ordering guarantees. Rather, it
> strives to provide data prioritization. Consider a use case where NiFi is
> reading a temperature sensor on an oil rig with poor comms. Comms go out
> and 30 minutes later, they come back. If there’s a fire, we don’t want the
> 1,000 readings that have been taking during that 30 minutes - if there’s
> one that says there’s a fire, we want that one first. So this is achieved
> using FlowFile Prioritizers.
>
> One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data
> in a given queue is processed in the order that it arrived in the queue.
> So, for a strictly linear flow (i.e., a flow that goes from Processor A to
> B to C, without any routing/decision making) then this works as a strict
> ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’
> relationship then you can have the data get out of order.
>
> However, a common flow that we do see is to have Debezium (or something
> similar) monitoring a database for changes, and publish CDC events to
> Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 ->
> (possibly UpdateRecord/LookupRecord, etc. to perform
> Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In
> this flow, all connections use the FIFO Prioritizer. And it ensure that
> PublishDatabaseRecord / PutKudu is configured in a  way that it won’t route
> to failure - instead, if there’s a failure, it rolls back the session. Now,
> this handles the concern of ordering once the data is on the node, but the
> data must also arrive in the correct order from Kafka. So, for this case,
> you must also pin specific Kafka partitions to specific nifi nodes, which
> can be done by adding user-defined properties, as described in the
> documentation.
>
> Thanks
> -Mark
>
> On Apr 1, 2021, at 8:10 AM, Boris Tyukin <bo...@boristyukin.com> wrote:
>
> We ended up building a simple groovy processor that will use mysql db to
> queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue
> until we address an issue with flowfile A. We also used back pressure
> feature to slow down upstream Kafka consumers.
>
> After playing with wait/notify we found it extremely difficult and
> cumbersome. Enforce order was not really doing much for us as well. Our use
> case was to process kafka messages on 3 node nifi cluster in order.
>
> It worked really well in the end for us
>
> On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <
> dries.vanautreve@vlaanderen.be> wrote:
>
>> Hello all
>>
>> We recently started using NiFi and we were wondering if strict order of
>> processing flow files in a cluster could be guaranteed by NiFi.
>>
>> One of the use cases is as following: messages arrive in a specific
>> order, go through a simple flow with some basic transformations and are
>> written to the destination (usually a relational database). The source of
>> the messages can be a database, Kafka queue, …
>> It’s important that messages are written to the destination in exactly
>> the same order they arrived at NiFi. The reason is that messages could be
>> deltas and we do not want to overwrite newer data with older deltas.
>> Moreover we do not always control the message format, hence controlling
>> this from the messaging protocol point of view might not be possible.
>>
>> We did some research in various places but have not found a satisfying
>> answer. Our own investigations have revealed that:
>> - Just running the first processor on the primary node is not enough even
>> with a load balancing strategy “single node”. While testing with stopping /
>> starting the primary node we had some situations were messages got out of
>> order.
>> - Using the EnforceOrder processor with high timeouts prevented the
>> messages getting processed out of order, but each time the primary node
>> changes, manual intervention is required to reconfigure the initial order
>> property. Moreover it requires that the source system or first processor
>> provides this incrementing sequence attribute.
>>
>> It seems also not possible to pinpoint a flow to a specific node. At
>> least we have not found this option. We do understand that this would
>> affect scalability and availability or failover, but might be acceptable
>> for those specific cases.
>>
>> If there are other options we can explore, any input would be helpful.
>> Or if it’s not (easily) possible with NiFi on its own, it would be good
>> to know!
>>
>> --
>> Kind Regards
>> Dries Van Autreve
>>
>>
>> (Sorry if this will result in a double post. I was not yet subscribed
>> when I did the first post and my message does not seem to appear in the
>> list...)
>>
>>
>

Re: Strict order of flow files in a cluster

Posted by Mark Payne <ma...@hotmail.com>.
Dries,

The short answer is that, depending on your source, destination, and what you’re doing in between, it is sometimes (but not always) possible. But not particularly simple.

The longer version:

NiFi doesn’t strive to provide strict ordering guarantees. Rather, it strives to provide data prioritization. Consider a use case where NiFi is reading a temperature sensor on an oil rig with poor comms. Comms go out and 30 minutes later, they come back. If there’s a fire, we don’t want the 1,000 readings that have been taking during that 30 minutes - if there’s one that says there’s a fire, we want that one first. So this is achieved using FlowFile Prioritizers.

One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data in a given queue is processed in the order that it arrived in the queue. So, for a strictly linear flow (i.e., a flow that goes from Processor A to B to C, without any routing/decision making) then this works as a strict ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’ relationship then you can have the data get out of order.

However, a common flow that we do see is to have Debezium (or something similar) monitoring a database for changes, and publish CDC events to Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 -> (possibly UpdateRecord/LookupRecord, etc. to perform Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In this flow, all connections use the FIFO Prioritizer. And it ensure that PublishDatabaseRecord / PutKudu is configured in a  way that it won’t route to failure - instead, if there’s a failure, it rolls back the session. Now, this handles the concern of ordering once the data is on the node, but the data must also arrive in the correct order from Kafka. So, for this case, you must also pin specific Kafka partitions to specific nifi nodes, which can be done by adding user-defined properties, as described in the documentation.

Thanks
-Mark

On Apr 1, 2021, at 8:10 AM, Boris Tyukin <bo...@boristyukin.com>> wrote:

We ended up building a simple groovy processor that will use mysql db to queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue until we address an issue with flowfile A. We also used back pressure feature to slow down upstream Kafka consumers.

After playing with wait/notify we found it extremely difficult and cumbersome. Enforce order was not really doing much for us as well. Our use case was to process kafka messages on 3 node nifi cluster in order.

It worked really well in the end for us

On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <dr...@vlaanderen.be>> wrote:
Hello all

We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, …
It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible.

We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

--
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)



Re: Strict order of flow files in a cluster

Posted by Boris Tyukin <bo...@boristyukin.com>.
We ended up building a simple groovy processor that will use mysql db to
queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue
until we address an issue with flowfile A. We also used back pressure
feature to slow down upstream Kafka consumers.

After playing with wait/notify we found it extremely difficult and
cumbersome. Enforce order was not really doing much for us as well. Our use
case was to process kafka messages on 3 node nifi cluster in order.

It worked really well in the end for us

On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <dr...@vlaanderen.be>
wrote:

> Hello all
>
> We recently started using NiFi and we were wondering if strict order of
> processing flow files in a cluster could be guaranteed by NiFi.
>
> One of the use cases is as following: messages arrive in a specific order,
> go through a simple flow with some basic transformations and are written to
> the destination (usually a relational database). The source of the messages
> can be a database, Kafka queue, …
> It’s important that messages are written to the destination in exactly the
> same order they arrived at NiFi. The reason is that messages could be
> deltas and we do not want to overwrite newer data with older deltas.
> Moreover we do not always control the message format, hence controlling
> this from the messaging protocol point of view might not be possible.
>
> We did some research in various places but have not found a satisfying
> answer. Our own investigations have revealed that:
> - Just running the first processor on the primary node is not enough even
> with a load balancing strategy “single node”. While testing with stopping /
> starting the primary node we had some situations were messages got out of
> order.
> - Using the EnforceOrder processor with high timeouts prevented the
> messages getting processed out of order, but each time the primary node
> changes, manual intervention is required to reconfigure the initial order
> property. Moreover it requires that the source system or first processor
> provides this incrementing sequence attribute.
>
> It seems also not possible to pinpoint a flow to a specific node. At least
> we have not found this option. We do understand that this would affect
> scalability and availability or failover, but might be acceptable for those
> specific cases.
>
> If there are other options we can explore, any input would be helpful.
> Or if it’s not (easily) possible with NiFi on its own, it would be good to
> know!
>
> --
> Kind Regards
> Dries Van Autreve
>
>
> (Sorry if this will result in a double post. I was not yet subscribed when
> I did the first post and my message does not seem to appear in the list...)
>
>

AW: Strict order of flow files in a cluster

Posted by "Dobbernack, Harald (Key-Work)" <ha...@key-work.de>.
We work with a trigger mechanism using wait/notify to ensure that a sequence will be maintained. Basically a new flowfile will only be worked upon if a corresponding special triggerflowfile has been received. As soon as the flowfile has been 'finished' the flow itself creates the trigger (kind of like a receipt)  for the next expected data waiting in line. This works in our use case as we have available a sequence or generation identifier  as metadata on the incoming files without any gaps.

-----Ursprüngliche Nachricht-----
Von: Van Autreve Dries <dr...@vlaanderen.be>
Gesendet: Donnerstag, 1. April 2021 11:44
An: users@nifi.apache.org
Betreff: Re: Strict order of flow files in a cluster

Hello Harald

Thanks for the swift reply.

If I read about this feature, I see in the user guide:
"This means that in a 5-node cluster, for example, there may be up to 5 incoming FlowFiles being processed simultaneously."
In the tooltip: "Only a single FlowFile is to be allowed to enter the Process Group at a time on each node in the cluster."

So it's not accros the cluster. This is also the behaviour I notice after a quick test.

On 01/04/2021, 10:24, "Dobbernack, Harald (Key-Work)" <ha...@key-work.de> wrote:

    I suppose the feature 'Ability to specify group level flow file concurrency - for instance run a single flow file end to end before running another for traditional job handling' available from Version 1.12 upward should be helpful here (have not tried myself yet)



Harald Dobbernack

Key-Work Consulting GmbH | Kriegsstr. 100 | 76133 | Karlsruhe | Germany | www.key-work.de<https://www.key-work.de> | Datenschutz<https://www.key-work.de/de/footer/datenschutz.html>
Fon: +49-721-78203-264 | E-Mail: harald.dobbernack@key-work.de

Key-Work Consulting GmbH, Karlsruhe, HRB 108695, HRG Mannheim
Geschäftsführer: Andreas Stappert, Tobin Wotring

Re: Strict order of flow files in a cluster

Posted by Van Autreve Dries <dr...@vlaanderen.be>.
Hello Harald

Thanks for the swift reply.

If I read about this feature, I see in the user guide:
"This means that in a 5-node cluster, for example, there may be up to 5 incoming FlowFiles being processed simultaneously."
In the tooltip: "Only a single FlowFile is to be allowed to enter the Process Group at a time on each node in the cluster."

So it's not accros the cluster. This is also the behaviour I notice after a quick test. 

On 01/04/2021, 10:24, "Dobbernack, Harald (Key-Work)" <ha...@key-work.de> wrote:

    I suppose the feature 'Ability to specify group level flow file concurrency - for instance run a single flow file end to end before running another for traditional job handling' available from Version 1.12 upward should be helpful here (have not tried myself yet)


AW: Strict order of flow files in a cluster

Posted by "Dobbernack, Harald (Key-Work)" <ha...@key-work.de>.
I suppose the feature 'Ability to specify group level flow file concurrency - for instance run a single flow file end to end before running another for traditional job handling' available from Version 1.12 upward should be helpful here (have not tried myself yet)

-----Ursprüngliche Nachricht-----
Von: Van Autreve Dries <dr...@vlaanderen.be>
Gesendet: Donnerstag, 1. April 2021 09:36
An: users@nifi.apache.org
Betreff: Strict order of flow files in a cluster

Hello all

We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, … It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible.

We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

--
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)



Harald Dobbernack

Key-Work Consulting GmbH | Kriegsstr. 100 | 76133 | Karlsruhe | Germany | www.key-work.de<https://www.key-work.de> | Datenschutz<https://www.key-work.de/de/footer/datenschutz.html>
Fon: +49-721-78203-264 | E-Mail: harald.dobbernack@key-work.de

Key-Work Consulting GmbH, Karlsruhe, HRB 108695, HRG Mannheim
Geschäftsführer: Andreas Stappert, Tobin Wotring