You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Theo Diefenthal <th...@scoop-software.de> on 2021/10/03 00:37:34 UTC

Does NiFi provide end-to-end exactly once semantics with kafka sink and replayable source?

Hi there, 

At the moment, I'm wondering whether NiFi provides end-to-end exactly once semantics in a place where it should in general be possible (speaking not in terms of NiFi but from a technical viewpoint). My example: Reading a file from filesystem and write the contents of that file to Kafka. Apache Flink for instance can provide this guarantee when having checkpoints enabled and setting up the producer in an exactly once mode (i.e. enable transactions) 

With regards to NiFi, I read some unclear statements: 

- In the NiFi docs [1], I found nothing with regards to exactly-once but only a statement letting me decude at-least-once delivery: "A core philosophy of NiFi has been that even at very high scale, guaranteed delivery is a must. This is achieved through effective use of a purpose-built persistent write-ahead log and content repository." 
- In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" delivery" problem is mentioned twice. As this is a crash course of and advertising NiFi, I think I can/should deduce from this slide that NiFi addresses and solves that exactly-once challenge for me. 
- In the book "Practical Real-time Data Processing and Analytics" from 2017 by Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a guaranteed delivery processing engine (exactly once) by default, which maintains write-ahead logs and a content repository to achieve this." 
- In [3], back in 2016 where Kafka didn't yet released version 0.11 which allowed for the first time end-to-end exactly-once semantics in producing, Mark Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At Least Once delivery of your data ", which is in sync with the statements of the current NiFi docs I'd say. But he also wrote that in general, exactly once semantics can be achieved for a distributed system if the source is replayable and the sink can go along with that (he explicitly mentions deduplication, but I think from a todays perspective, a sink supporting a two phase commit (like kafka) should in general work as well as Flink demonstrates). 
- In [4], in 2017 some community user wrote a knowledge article where he explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site communications which according to his words still provide only at-least-once semantics (But very robust ones, "close" to exactly-once). 
- A few days ago [5], Mark Payne created a JIRA issue for a new feature implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" (EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. In the introduction of that story, he talks about "While there are benefits to being able to do so [Implementing exactly once in standard NiFi], the requirements that Kafka puts forth don't really work well with NiFi's architecture.". So from that statement, I'd deduce that the current NiFi doesn't have exactly once semantics and that it sadly doesn't fit well into NiFi architecture. 

In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit well to the architecutre of NiFi? From my limited understanding of NiFi (I am currently evaluating it playing around with it for the first time), it should in general be possible. Each processor works with transactions which can be rolled backed or committed and being persisted to a write ahead log. So if we don't face a hardware disk failure losing the hard drive NiFi is running on, exactly-once semantics should be possible between from and to each processor and hence, with transactions/two-phase-commit in the kafka producer, fully end to end exactly once (semantics)?! 

Best regards 
Theo 

[1] [ https://nifi.apache.org/docs/nifi-docs/html/overview.html | https://nifi.apache.org/docs/nifi-docs/html/overview.html ] 
[2] [ https://www.youtube.com/watch?v=fblkgr1PJ0o | https://www.youtube.com/watch?v=fblkgr1PJ0o ] 
[3] [ https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655 | https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655 ] 
[4] [ https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688 | https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688 ] 
[5] [ https://issues.apache.org/jira/browse/NIFI-9239 | https://issues.apache.org/jira/browse/NIFI-9239 ] 

Re: Does NiFi provide end-to-end exactly once semantics with kafka sink and replayable source?

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Mark, 

Thank's a lot for your explanations. 

Sad to hear that NiFi doesn't support Exactly-Once out-of-the-box, but totally understandable. 

I think, your argumentation is not fully complete though. I might be wrong here, but the following is my understanding: 
You write that Kafka EOS works in a way that a transaction is fully handled by the producer. The producer _atomically_ takes care of committing source offsets and committing produced messages ( And in kafka streams also commits internal streaming state as well ). You conclucde that Kafka-EOS only apply when a given kafka cluster is both source and destination of the data. That's the understanding I also got when I first read the confluent blog series about exactly once processing. What confluent does in those posts: They explicitly describe how exactly once semantics are implemented in a kafka streaming application (reading from kafka and writing to the same kafka). Of course, if they are in "their own world", controlling source, state and sink, they can be more efficient with transactions and exactly once guarantees and that's what they describe. But the transaction concept of kafka applies to a broader range of tools (at least if one doesn't expect _atomic_ updates of source read and sink written simultaenously. But I also don't know why someone would need that) . 
When I read more about Apache Flink, I understood that the exactly once semantics of Kafka are not bound to processing in the kafka domain only. Kafka Producers implement a two-Phase-Commit protocol and one can utilize that with his own application. Apache Flink and Apache Spark both promise that. If one has a replayble source in Spark or Flink (Like a kafka topic) and one has a sink that is either idempotent or supports two-phase-commits, both Flink and Spark provide exactly once semantics on demand (Of course, your processing job must be free of side-effects). See [1] for instance. 

With the two-phase-commit sink, in the end, it comes down that you kind of need to manage your entire state in one system. In Kafka-Streams, they manage all their state in kafka (and committing atomically to that). In Flink, it is managed in the configured state backend, which can be a standard filesystem, HDFS, s3, whatever... 
Let's take a look at a very simple example reading from a Kafka cluster, writing to another kafka cluster with a SQL DB in the middle as state store. (Of course that's not performant): 
1. I can read messages from kafka and write those to a SQL DB. If I manage my consumer offsets myself within the SQL DB where I store the kafka records as well, I can easily make exactly once guarantees. Either I commit a transaction (with new consumed records and offsets) atomically, or I abort it. If something fails, I redo the process (Hence replayable source needed) 
2. More tricky: I read records from the DB, and within another DB transaction, I write a batch of messages to kafka (in a kafka transaction) and if Kafka succeeds the pre-commit, I commit my SQL DB transaction with the required infos (transactional id, producer id, transaction epoch). With some more communications with the DB, eventually I end up with exactly once semantics from DB to Kafka. (Kafka transactions with two phase commit. If something crashes after the pre-commit, I can restart the producer, reassign transactional id, resumue transaction with producerId and epoch and retry the commit until it eventually succeeds). 
In summary, I designed end-to-end exactly once from one kafka cluster to another kafka cluster. [If we want to introduce some new naming here, we could call that eventual end-to-end exactly once :) Source and Sink not updated simultaenously as in Kafka (atomic), but utilized some intermediate store to track progress]. 
Of course, the devil is in the detail and the implementation of such logic is very tricky. One has to take care of lots of possible failure cases, but technically it's possible. If that's technically possible within the current NiFi architecture is another question though which I can't answer as I don't have enough understanding of NiFi. For instance, I think for two phase commits, one needs some kind of "coordinator". In Flink, that's the Jobmanager. In NiFi, I don't know if there is such thing. I just wanted to add my two cents, that in general, kafka can provide exactly once semantics even when not fully operating withing a single kafka cluster only but having some other compatible sources and/or sinks involved. 

[1] [ https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html | https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html ] 

Best regards 
Theo 


Von: "markap14" <ma...@hotmail.com> 
An: "Theo Diefenthal" <th...@scoop-software.de> 
CC: "users" <us...@nifi.apache.org> 
Gesendet: Sonntag, 3. Oktober 2021 16:04:26 
Betreff: Re: Does NiFi provide end-to-end exactly once semantics with kafka sink and replayable source? 

Hey Theo, 

So there’s quite a lot to unpack here. Possibly entire books could be written on some of this stuff. And there’s a lot that is misunderstood about Apache Kafka’s exactly once semantics (EOS). In fact, I’m working on a YouTube video and blog that will cover a lot of this in more detail. 

But the super short version of it is: 

- Apache NiFi does not currently support Kafka's Exactly Once, even though it does support Kafka transactions 
- I don’t know a lot about a Flink. But the way that Kafka’s EOS works is that offsets that are consumed by a consumer are then committed not by the consumer but rather a Kafka Producer. As a result, if there’s some failure or cancellation, the transaction is never completed and everything is rolled back. This is what provides the Exactly Once Semantics. Because of that, Exactly Once semantics ONLY apply when a given Kafka cluster is both the source of the data AND the destination of the data. You cannot guarantee Exactly Once when sending a file from a file system. This is discussed more at [1]. There’s a diagram there that shows this as the “wild west”. 

In terms of what about NiFi’s architecture causes conflicts: 
- NiFi’s components are, by design, loosely coupled. Processor A doesn’t know anything about Processor B. This was done for many reasons and most of the time is exactly what we want and has a lot of advantages. But in order to play in Kafka’s EOS world, you need a tighter coupling - if processing fails or the publisher can’t send data back, the consumer needs to know to rollback its offsets. 
- Secondly, NiFi persists the data across restarts. If NiFi is restarted, without the transaction being committed, it would result in NiFi pulling a second copy of the data on restart, but the data that already was consumed will still be processed, so you’d end up with duplicate (at least once). 
- Finally, we need to take into consideration data ordering guarantees. Specifically, if we have two FlowFiles, one with Kafka offset 10 and one with Kafka offset of 100, then we must NOT send the data produced from offset 100 in a transaction before a different transaction sends the data from offset 10. This becomes difficult to guarantee with NiFi, since the dataflow may split data apart, filter some, introduce many branches that take a long time to complete, etc. 

With the upcoming changes in 1.15.0 (assuming all goes as planned and that the proposed components are all merged, etc.) we will introduce the capabilities necessary. We’ll have the ability to run a dataflow using the Stateless NiFi engine much more easily, and as I said I’ll be putting together a YouTube Video and/or blog that demonstrates all of this. 

Hope this is helpful! 
-Mark 

[1] [ https://www.confluent.io/blog/chain-services-exactly-guarantees/ | https://www.confluent.io/blog/chain-services-exactly-guarantees/ ] 





On Oct 2, 2021, at 8:37 PM, Theo Diefenthal < [ mailto:theo.diefenthal@scoop-software.de | theo.diefenthal@scoop-software.de ] > wrote: 

Hi there, 

At the moment, I'm wondering whether NiFi provides end-to-end exactly once semantics in a place where it should in general be possible (speaking not in terms of NiFi but from a technical viewpoint). My example: Reading a file from filesystem and write the contents of that file to Kafka. Apache Flink for instance can provide this guarantee when having checkpoints enabled and setting up the producer in an exactly once mode (i.e. enable transactions) 

With regards to NiFi, I read some unclear statements: 

- In the NiFi docs [1], I found nothing with regards to exactly-once but only a statement letting me decude at-least-once delivery: "A core philosophy of NiFi has been that even at very high scale, guaranteed delivery is a must. This is achieved through effective use of a purpose-built persistent write-ahead log and content repository." 
- In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" delivery" problem is mentioned twice. As this is a crash course of and advertising NiFi, I think I can/should deduce from this slide that NiFi addresses and solves that exactly-once challenge for me. 
- In the book "Practical Real-time Data Processing and Analytics" from 2017 by Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a guaranteed delivery processing engine (exactly once) by default, which maintains write-ahead logs and a content repository to achieve this." 
- In [3], back in 2016 where Kafka didn't yet released version 0.11 which allowed for the first time end-to-end exactly-once semantics in producing, Mark Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At Least Once delivery of your data ", which is in sync with the statements of the current NiFi docs I'd say. But he also wrote that in general, exactly once semantics can be achieved for a distributed system if the source is replayable and the sink can go along with that (he explicitly mentions deduplication, but I think from a todays perspective, a sink supporting a two phase commit (like kafka) should in general work as well as Flink demonstrates). 
- In [4], in 2017 some community user wrote a knowledge article where he explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site communications which according to his words still provide only at-least-once semantics (But very robust ones, "close" to exactly-once). 
- A few days ago [5], Mark Payne created a JIRA issue for a new feature implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" (EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. In the introduction of that story, he talks about "While there are benefits to being able to do so [Implementing exactly once in standard NiFi], the requirements that Kafka puts forth don't really work well with NiFi's architecture.". So from that statement, I'd deduce that the current NiFi doesn't have exactly once semantics and that it sadly doesn't fit well into NiFi architecture. 

In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit well to the architecutre of NiFi? From my limited understanding of NiFi (I am currently evaluating it playing around with it for the first time), it should in general be possible. Each processor works with transactions which can be rolled backed or committed and being persisted to a write ahead log. So if we don't face a hardware disk failure losing the hard drive NiFi is running on, exactly-once semantics should be possible between from and to each processor and hence, with transactions/two-phase-commit in the kafka producer, fully end to end exactly once (semantics)?! 

Best regards 
Theo 

[1] [ https://nifi.apache.org/docs/nifi-docs/html/overview.html | https://nifi.apache.org/docs/nifi-docs/html/overview.html ] 
[2] [ https://www.youtube.com/watch?v=fblkgr1PJ0o | https://www.youtube.com/watch?v=fblkgr1PJ0o ] 
[3] [ https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655 | https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655 ] 
[4] [ https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688 | https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688 ] 
[5] [ https://issues.apache.org/jira/browse/NIFI-9239 | https://issues.apache.org/jira/browse/NIFI-9239 ] 




Re: Does NiFi provide end-to-end exactly once semantics with kafka sink and replayable source?

Posted by Mark Payne <ma...@hotmail.com>.
Hey Theo,

So there’s quite a lot to unpack here. Possibly entire books could be written on some of this stuff. And there’s a lot that is misunderstood about Apache Kafka’s exactly once semantics (EOS). In fact, I’m working on a YouTube video and blog that will cover a lot of this in more detail.

But the super short version of it is:

- Apache NiFi does not currently support Kafka's Exactly Once, even though it does support Kafka transactions
- I don’t know a lot about a Flink. But the way that Kafka’s EOS works is that offsets that are consumed by a consumer are then committed not by the consumer but rather a Kafka Producer. As a result, if there’s some failure or cancellation, the transaction is never completed and everything is rolled back. This is what provides the Exactly Once Semantics. Because of that, Exactly Once semantics ONLY apply when a given Kafka cluster is both the source of the data AND the destination of the data. You cannot guarantee Exactly Once when sending a file from a file system. This is discussed more at [1]. There’s a diagram there that shows this as the “wild west”.

In terms of what about NiFi’s architecture causes conflicts:
- NiFi’s components are, by design, loosely coupled. Processor A doesn’t know anything about Processor B. This was done for many reasons and most of the time is exactly what we want and has a lot of advantages. But in order to play in Kafka’s EOS world, you need a tighter coupling - if processing fails or the publisher can’t send data back, the consumer needs to know to rollback its offsets.
- Secondly, NiFi persists the data across restarts. If NiFi is restarted, without the transaction being committed, it would result in NiFi pulling a second copy of the data on restart, but the data that already was consumed will still be processed, so you’d end up with duplicate (at least once).
- Finally, we need to take into consideration data ordering guarantees. Specifically, if we have two FlowFiles, one with Kafka offset 10 and one with Kafka offset of 100, then we must NOT send the data produced from offset 100 in a transaction before a different transaction sends the data from offset 10. This becomes difficult to guarantee with NiFi, since the dataflow may split data apart, filter some, introduce many branches that take a long time to complete, etc.

With the upcoming changes in 1.15.0 (assuming all goes as planned and that the proposed components are all merged, etc.) we will introduce the capabilities necessary. We’ll have the ability to run a dataflow using the Stateless NiFi engine much more easily, and as I said I’ll be putting together a YouTube Video and/or blog that demonstrates all of this.

Hope this is helpful!
-Mark

[1] https://www.confluent.io/blog/chain-services-exactly-guarantees/


On Oct 2, 2021, at 8:37 PM, Theo Diefenthal <th...@scoop-software.de>> wrote:

Hi there,

At the moment, I'm wondering whether NiFi provides end-to-end exactly once semantics in a place where it should in general be possible (speaking not in terms of NiFi but from a technical viewpoint). My example: Reading a file from filesystem and write the contents of that file to Kafka. Apache Flink for instance can provide this guarantee when having checkpoints enabled and setting up the producer in an exactly once mode (i.e. enable transactions)

With regards to NiFi, I read some unclear statements:

- In the NiFi docs [1], I found nothing with regards to exactly-once but only a statement letting me decude at-least-once delivery: "A core philosophy of NiFi has been that even at very high scale, guaranteed delivery is a must. This is achieved through effective use of a purpose-built persistent write-ahead log and content repository."
- In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" delivery" problem is mentioned twice. As this is a crash course of and advertising NiFi, I think I can/should deduce from this slide that NiFi addresses and solves that exactly-once challenge for me.
- In the book "Practical Real-time Data Processing and Analytics" from 2017 by Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a guaranteed delivery processing engine (exactly once) by default, which maintains write-ahead logs and a content repository to achieve this."
- In [3], back in 2016 where Kafka didn't yet released version 0.11 which allowed for the first time end-to-end exactly-once semantics in producing, Mark Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At Least Once delivery of your data ", which is in sync with the statements of the current NiFi docs I'd say. But he also wrote that in general, exactly once semantics can be achieved for a distributed system if the source is replayable and the sink can go along with that (he explicitly mentions deduplication, but I think from a todays perspective, a sink supporting a two phase commit (like kafka) should in general work as well as Flink demonstrates).
- In [4], in 2017 some community user wrote a knowledge article where he explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site communications which according to his words still provide only at-least-once semantics (But very robust ones, "close" to exactly-once).
- A few days ago [5], Mark Payne created a JIRA issue for a new feature implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" (EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. In the introduction of that story, he talks about "While there are benefits to being able to do so [Implementing exactly once in standard NiFi], the requirements that Kafka puts forth don't really work well with NiFi's architecture.". So from that statement, I'd deduce that the current NiFi doesn't have exactly once semantics and that it sadly doesn't fit well into NiFi architecture.

In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit well to the architecutre of NiFi? From my limited understanding of NiFi (I am currently evaluating it playing around with it for the first time), it should in general be possible. Each processor works with transactions which can be rolled backed or committed and being persisted to a write ahead log. So if we don't face a hardware disk failure losing the hard drive NiFi is running on, exactly-once semantics should be possible between from and to each processor and hence, with transactions/two-phase-commit in the kafka producer, fully end to end exactly once (semantics)?!

Best regards
Theo

[1] https://nifi.apache.org/docs/nifi-docs/html/overview.html
[2] https://www.youtube.com/watch?v=fblkgr1PJ0o
[3] https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655
[4] https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688
[5] https://issues.apache.org/jira/browse/NIFI-9239