You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "F.Amara" <fa...@wso2.com> on 2017/05/24 14:51:47 UTC

Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Hi all,

I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3.

I have a Flink Highly Available cluster that reads data from a Kafka
producer and processes them within the cluster. I randomly kill a Task
Manager to introduce failure. Restart strategy is configured and the cluster
does restart processing after a slight delay which is expected. 
But when I check the output after the final processing is done, I see
duplicates (when sending 4200 events with a 40ms delay between them observed
56 duplicates). As mentioned in [1] I have configured
ExternalizedCheckpoints but still do observe duplicates.

Even when I tested (cancelled job and restarted) using manual savepoints 2
or 3 duplicates appeared!

Can someone explain how I can use the savepoint created through
ExternalizedCheckpoints to make the application start processing exactly
from where it left? I need the application to automatically read the
savepoint details and recover from that point onwards rather than doing it
manually.
Or else is the usual Savepoints capable of serving the same functionality
automatically?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html 

Thanks,
Amara



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Amara,
please refer to [1] for some details about our checkpointing mechanism, in 
short, for your situation:

* checkpoints are made at certain checkpoint barriers,
* in between those barriers, processing continues and so do outputs
* in case of a failure the state at the latest checkpoint is restored
* then the processing re-starts from there and you will see the same outputs 
again

You seem to not deliver to Kafka but only consume from it and write to a csv 
file. If this operation was transactional, you would commit at each checkpoint 
barrier only and never see the "duplicate", i.e. uncommitted events.

Regards,
Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/
stream_checkpointing.html

On Monday, 5 June 2017 08:55:05 CEST F.Amara wrote:
> Hi Robert,
> 
> I have few more questions to clarify.
> 
> 1) Why do you say printing the values to the standard out would display
> duplicates even if exactly once works? What is the reason for this? Could
> you brief me on this?
> 
> 2) I observed duplicates (by writing to a file) starting from the
> FlinkKafkaConsumer onwards. Why does this component introduce duplicates? Is
> it because Kafka guarantees only At-least once delivery at the moment?
> 
> Thanks,
> Amara
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplica
> ted-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-clu
> ster-tp13301p13483.html Sent from the Apache Flink User Mailing List
> archive. mailing list archive at Nabble.com.


Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by "F.Amara" <fa...@wso2.com>.
Hi Robert,

I have few more questions to clarify.

1) Why do you say printing the values to the standard out would display
duplicates even if exactly once works? What is the reason for this? Could
you brief me on this?

2) I observed duplicates (by writing to a file) starting from the
FlinkKafkaConsumer onwards. Why does this component introduce duplicates? Is
it because Kafka guarantees only At-least once delivery at the moment?

Thanks,
Amara



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301p13483.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by "F.Amara" <fa...@wso2.com>.
Hi Robert,

Thanks a lot for the reply. 

To further explain how I verify the presence of duplicates, I write the
output stream received at the FlinkKafkaConsumer (after being sent from the
KafkaProducer) to a csv file. 
Then the content of the file is scanned to see whether we received the exact
amount of events sent from the KafkaProducer and then look for values that
have appeared more than once indicating duplicates. 
In my case the total number of events received is always higher than what we
sent.  

The following diagram explains the procedure.

|----------------------------------|       |-------------------|       
|---------------------------------|
|	      KafkaProducer	         |-------->|	      Kafka	     |------>|	 
FlinkKafkaConsumer	  |
|(A separate Java process|	          |                       |      |	  
(Starting point of	        |
|  which generates data     |	          |                       |      |	  
Flink application)	        |
|  and writes to Kafka)       |           |                       |      | 	                                         
|
|----------------------------------|           |-------------------|     
|------------------------------------|


Thanks,
Amara



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301p13481.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by Robert Metzger <rm...@apache.org>.
Hi Amara,
how are you validating if you have duplicates in your output or not?

If you are just writing the output to another Kafka topic or print it to
standard out, you'll see duplicates even if exactly once works.
Flink does not provide exactly once delivery. Flink has exactly once
semantics for registered state.

This means you need to cooperate with the system to achieve exactly once.
For example for files, you need to remove invalid data from previous failed
checkpoints. Our bucketing sink is doing that.


On Tue, May 30, 2017 at 9:01 AM, F.Amara <fa...@wso2.com> wrote:

> Hi Gordan,
>
> Thanks alot for the reply.
> The events are produced using a KafkaProducer, submitted to a topic and
> thereby consumed by the Flink application using a FlinkKafkaConsumer. I
> verified that during a failure recovery scenario(of the Flink application)
> the KafkaProducer was not interrupted, resulting in not sending duplicated
> values from the data source. I observed the output from the
> FlinkKafkaConsumer and noticed duplicates starting from that point onwards.
> Is the FlinkKafkaConsumer capable of intoducing duplicates?
>
> How can I implement exactly-once processing for my application? Could you
> please guide me on what I might have missed?
>
> Thanks,
> Amara
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-
> when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-
> tp13301p13379.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by "F.Amara" <fa...@wso2.com>.
Hi Gordan,

Thanks alot for the reply. 
The events are produced using a KafkaProducer, submitted to a topic and
thereby consumed by the Flink application using a FlinkKafkaConsumer. I
verified that during a failure recovery scenario(of the Flink application)
the KafkaProducer was not interrupted, resulting in not sending duplicated
values from the data source. I observed the output from the
FlinkKafkaConsumer and noticed duplicates starting from that point onwards.
Is the FlinkKafkaConsumer capable of intoducing duplicates?

How can I implement exactly-once processing for my application? Could you
please guide me on what I might have missed?

Thanks,
Amara




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301p13379.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Amara,

Could you elaborate a bit more detail about your job? How are you producing the 4200 events into Kafka? Is that a separate process than the consuming job?
Do note that sending data to a Kafka topic is currently only at-least-once delivery, so if you’re sending the data to the Kafka topic as part of the failing job, it’ll be likely that there will be duplicates in the topic.

Also, how are you verifying the exactly-once? As I explained, Kafka producing has only at-least-once delivery guarantees, so checking for topic-to-topic duplicates is currently not a valid way to verify this. To properly verify this, I would suggest having a stateful operator that counts the number of events it has processed, and register that count as a Link managed state. That count should always be 4200 upon cancelling and restarting the job from savepoints.

How you’re creating the savepoint, and whether or not its externalized, is irrelevant here; the exactly-once state guarantees should still hold.

Cheers,
Gordon

On 24 May 2017 at 11:05:49 PM, F.Amara (fathima@wso2.com) wrote:

Hi all,  

I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3.  

I have a Flink Highly Available cluster that reads data from a Kafka  
producer and processes them within the cluster. I randomly kill a Task  
Manager to introduce failure. Restart strategy is configured and the cluster  
does restart processing after a slight delay which is expected.  
But when I check the output after the final processing is done, I see  
duplicates (when sending 4200 events with a 40ms delay between them observed  
56 duplicates). As mentioned in [1] I have configured  
ExternalizedCheckpoints but still do observe duplicates.  

Even when I tested (cancelled job and restarted) using manual savepoints 2  
or 3 duplicates appeared!  

Can someone explain how I can use the savepoint created through  
ExternalizedCheckpoints to make the application start processing exactly  
from where it left? I need the application to automatically read the  
savepoint details and recover from that point onwards rather than doing it  
manually.  
Or else is the usual Savepoints capable of serving the same functionality  
automatically?  

[1]  
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html  

Thanks,  
Amara  



--  
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301.html  
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.