You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Aian Cantabrana <ac...@zylk.net> on 2022/12/02 14:54:34 UTC

Re: Exacly once from NiFi to Kafka

Hi 

I did a deeper analysis in the Data Provenance, NiFi logs and Kafka logs. I will explain again my NiFi flow and observed behaviour: 

NiFi version 1.18.0 deployed on Windows 7 (one single node). 
Kafa version 2.7.0 (3 brokers). 

NiFi flow: 



All queues with FirstInFirstOutPrioritizer and each processor with the following configuration: 

- ConsumeAMQP: 


- PublishKafka_2_6 : 

- PublishKafka_2_6_normal : 


Both PublishKafka processors are sending data to the same Kafka cluster (3 nodes) to two different topics, each with one partition and replication factor 2. Outgoing FlowFiles from ConsumeAMQP processor contain one single json per FlowFile and 65 bytes size. With this configuration, from time to time, we recieve duplicated messages in TOPIC_1. 

That being said, I analyze one batch of duplicated messages we recieved between 11/28/2022 07:15:30 GMT and 11/28/2022 07:15:32 GMT. Here are the results: 

1. Kafka logs show no rebalancing or repartition of the topic in that period of time. 
2. NiFi logs show no error, warning or any indicator that something gone wrong in that period of time. 
3. Data provenance shows that those messages were not duplicated in AMQP broker nor were they duplicated in any point of NiFi flow, BUT duplicate messages seen in Kafka have a Lineage Duration of nearly 40 seconds ( attatched is the Data Provenance analysis ). 

My conclusion is that some messages are not recieving the expected ACK from Kafka so NiFi resend them. But this raises some questions for me: 

1. First of all, if Acknowledgmente Wait Time is set to 5 secs, it does not have sense that messages have that large Lineage Duration (40 secs). What is going on during this time? 
2. If an ACK is not received from Kafka, shouldn't these flowfiles be routed to failure and, consequently, write an error in logs? Is Rollback failure strategy hiding this ACK timeout error? 
3. If a message is sent twice by the same producer to Kafka, shouldn't idempotence property handle it so no duplicated message is written to Kafka topic? 

Some last considerations: 

- We are using TOPIC_2 just as a backup and we are not aware if there are any duplicated messages. 
- From what I have understood after reading the documentation, setting Transactions to true and Delivery Guarantee to Guarantee Replicated Delivery implies the use of a Idempotence producer, and since max.in.flight.requests.per.connection default value is 5, is there any difference between these two processors? 


Thank you in advance, 

Aian 



De: "Pierre Villard" <pi...@gmail.com> 
Para: "users" <us...@nifi.apache.org> 
CC: "Aian Cantabrana" <ac...@zylk.net>, "Joe Witt" <jo...@gmail.com> 
Enviados: Sábado, 19 de Noviembre 2022 20:21:45 
Asunto: Re: Exacly once from NiFi to Kafka 

You don't need to run the Kafka processor only on the primary node. The Kafka client will take care of doing the proper partition-consumer assignment across the nodes and threads in the NiFi cluster. 

We're aware of many NiFi users consuming data from Kafka at very large scale without any issue duplicate issue. 

As mentioned before, I'd recommend looking at the provenance data for all generated flow file to really understand where there may be duplication. I'd also look at the Kafka logs and search for rebalancing / reassignment for consumers / partitions. 

Pierre 

Le sam. 19 nov. 2022, 19:33, Joe Obernberger < [ mailto:joseph.obernberger@gmail.com | joseph.obernberger@gmail.com ] > a écrit : 





Are you by chance using a clustered NiFi? I'm seeing duplicate messages if I run the consumer on multiple NiFi nodes, so I've started running the consumer only on the parent. This seems to correct the issue, but leads to other problems. I'd love a solution. 

-Joe 
On 11/16/2022 3:50 AM, Aian Cantabrana wrote: 

BQ_BEGIN

Hi Joe, 

Thanks for the reply. The actual flow is sending data from the ConsumeAMQP processor to two different PublishKafka processors, one with Idempotence and other with default config. Each of it is sending same data to two different topics and comparing both topics is how I am checking that there are duplicates. It seems to be random, some times they appear in the "normal" processor's topic and others in the "idempotence", I did not find any pattern. 

I will upgrade to NiFi 1.18.0 and try again. 

In any case, messages have json format (one json per flowfile) but since I am sending and storing them in kafka in plain text I am using no-record-oriented Kafka publisher. Is PublishKafkaRecord more reliable? Would it be better to use it? 

Thanks, 

Aian 


De: "Joe Witt" [ mailto:joe.witt@gmail.com | <jo...@gmail.com> ] 
Para: "users" [ mailto:users@nifi.apache.org | <us...@nifi.apache.org> ] 
Enviados: Martes, 15 de Noviembre 2022 17:31:54 
Asunto: Re: Exacly once from NiFi to Kafka 

Aian, 
How can you tell there are duplicates in Kafka and are you certain that no duplicates exist in the source topic? 

Given NiFi's data provenance capabilities you should be able to pin point a given duplicate and figure out whether it happened at the source, in nifi, or otherwise. 

Note much has changed/improved since the 1.12.x line of NiFi so we have more Kafka components and record oriented mechanisms. But still pretty sure even in your version we should not be duplicating data unless the flow is configured such that it would happen. 

Thanks 

On Tue, Nov 15, 2022 at 9:25 AM Aian Cantabrana < [ mailto:acantabrana@zylk.net | acantabrana@zylk.net ] > wrote: 

BQ_BEGIN

Hi, 

I am having some difficulties trying to get exactly-once semantic while ensuring data order from NiFi to Kafka. I have read Kafka documentation and it should be quite straight forward using idempotent producer from NiFi and having a Kafka topic with a single partition, but I am still getting some duplicated messages in Kafka. 

NiFi version: 1.12.1 
Kafka version: 2.7.0 

NiFi flow: 
(Both shown queues with FIFO prioritizer) 

PublishKafka_2_6 configuration: 

As I said, target Kafka topic has just one partition to ensure data order. 

Incoming flowfiles are small 60 bytes messages. 

I have been a while working with it so any suggestion is really welcome. 

Thanks in advance, 

Aian 





BQ_END


[ http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient ] 
	Virus-free. [ http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient | www.avg.com ] 
[ https://mail.zylk.net/#m_5138987603471460647_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2 |   ] 

BQ_END