You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antoine Philippot <an...@teads.tv> on 2017/10/02 13:35:06 UTC

Avoid duplicate messages while restarting a job for an application upgrade

Hi,

I'm working on a flink streaming app with a kafka09 to kafka09 use case
which handles around 100k messages per seconds.

To upgrade our application we used to run a flink cancel with savepoint
command followed by a flink run with the previous saved savepoint and the
new application fat jar as parameter. We notice that we can have more than
50k of duplicated messages in the kafka sink wich is not idempotent.

This behaviour is actually problematic for this project and I try to find a
solution / workaround to avoid these duplicated messages.

The JobManager indicates clearly that the cancel call is triggered once the
savepoint is finished, but during the savepoint execution, kafka source
continue to poll new messages which will not be part of the savepoint and
will be replayed on the next application start.

I try to find a solution with the stop command line argument but the kafka
source doesn't implement StoppableFunction (
https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
generation is not available with stop in contrary to cancel.

Is there an other solution to not process duplicated messages for each
application upgrade or rescaling ?

If no, has someone planned to implement it? Otherwise, I can propose a pull
request after some architecture advices.

The final goal is to stop polling source and trigger a savepoint once
polling stopped.

Thanks

Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Sorry for entering the discussion somewhat late but I wrote on the Issue you created, please have a look.

Best,
Aljoscha

> On 20. Oct 2017, at 16:56, Antoine Philippot <an...@teads.tv> wrote:
> 
> Hi Piotrek,
> 
> I come back to you with a Jira ticket that I created and a proposal
> the ticket : https://issues.apache.org/jira/browse/FLINK-7883 <https://issues.apache.org/jira/browse/FLINK-7883>
> the proposal  : https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6 <https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6>
> 
> I'am open to any comments or suggestions
> 
> Antoine
> 
> Le mar. 10 oct. 2017 à 09:28, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
> Hi, 
> 
> That’s good to hear :)
> 
> I quickly went through the code and it seems reasonable. I think there might be need to think a little bit more about how this cancel checkpoint should be exposed to the operators and what should be default action - right now by default cancel flag is ignored, I would like to consider if throwing an UnsupportedOperation would be a better long therm solution.
> 
> But at first glance I do not see any larger issues and it would great if you could make a pull request out of it.
> 
> Piotrek
> 
>> On 9 Oct 2017, at 15:56, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>> 
>> Thanks for your advices Piotr.
>> 
>> Firstly, yes, we are aware that even with clean shutdown we can end up with duplicated messages after a crash and it is acceptable as is it rare and unintentional unlike deploying new business code or up/down scale.
>> 
>> I made a fork of the 1.2.1 version which we currently use and developed a simple POC based on the solution to pass a boolean stopSourceSavepoint from the job manager to the source when a cancel with savepoint is triggered.
>> This is the altered code : https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint <https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint>
>> 
>> We test it with our production workload and there are no duplicated messages any more while hundred of thousands were duplicated before.
>> 
>> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate to it and maybe later to the 1.4
>> 
>> I'm open to suggestion or to help/develop this feature upstream if you want.
>> 
>> 
>> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
>> We are planning to work on this clean shut down after releasing Flink 1.4. Implementing this properly would require some work, for example:
>> - adding some checkpoint options to add information about “closing”/“shutting down” event
>> - add clean shutdown to source functions API
>> - implement handling of this clean shutdown in desired sources
>> 
>> Those are not super complicated changes but also not trivial.
>> 
>> One thing that you could do, is to implement some super hacky filter function just after source operator, that you would manually trigger. Normally it would pass all of the messages. Once triggered, it would wait for next checkpoint to happen. It would assume that it is a save point, and would start filtering out all of the subsequent messages. When this checkpoint completes, you could manually shutdown your Flink application. This could guarantee that there are no duplicated writes after a restart. This might work for clean shutdown, but it would be a very hacky solution. 
>> 
>> Btw, keep in mind that even with clean shutdown you can end up with duplicated messages after a crash and there is no way around this with Kafka 0.9.
>> 
>> Piotrek
>> 
>>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>>> 
>>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and until a while).
>>> 
>>> We can not afford tens of thousands of duplicated messages for each application upgrade, can I help by working on this feature ?
>>> Do you have any hint or details on this part of that "todo list" ? 
>>>  
>>> 
>>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
>>> Hi,
>>> 
>>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be possible to achieve exactly-once end to end semantic when writing to Kafka. However this still a work in progress:
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-6988 <https://issues.apache.org/jira/browse/FLINK-6988>
>>> 
>>> However this is a superset of functionality that you are asking for. Exactly-once just for clean shutdowns is also on our “TODO” list (it would/could support Kafka 0.9), but it is not currently being actively developed.
>>> 
>>> Piotr Nowojski
>>> 
>>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case which handles around 100k messages per seconds.
>>>> 
>>>> To upgrade our application we used to run a flink cancel with savepoint command followed by a flink run with the previous saved savepoint and the new application fat jar as parameter. We notice that we can have more than 50k of duplicated messages in the kafka sink wich is not idempotent.
>>>> 
>>>> This behaviour is actually problematic for this project and I try to find a solution / workaround to avoid these duplicated messages.
>>>> 
>>>> The JobManager indicates clearly that the cancel call is triggered once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start.
>>>> 
>>>> I try to find a solution with the stop command line argument but the kafka source doesn't implement StoppableFunction (https://issues.apache.org/jira/browse/FLINK-3404 <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint generation is not available with stop in contrary to cancel.
>>>> 
>>>> Is there an other solution to not process duplicated messages for each application upgrade or rescaling ?
>>>> 
>>>> If no, has someone planned to implement it? Otherwise, I can propose a pull request after some architecture advices.
>>>> 
>>>> The final goal is to stop polling source and trigger a savepoint once polling stopped.
>>>> 
>>>> Thanks
>>> 
>> 
> 


Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Antoine Philippot <an...@teads.tv>.
Hi Piotrek,

I come back to you with a Jira ticket that I created and a proposal
the ticket : https://issues.apache.org/jira/browse/FLINK-7883
the proposal  :
https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6

I'am open to any comments or suggestions

Antoine

Le mar. 10 oct. 2017 à 09:28, Piotr Nowojski <pi...@data-artisans.com> a
écrit :

> Hi,
>
> That’s good to hear :)
>
> I quickly went through the code and it seems reasonable. I think there
> might be need to think a little bit more about how this cancel checkpoint
> should be exposed to the operators and what should be default action -
> right now by default cancel flag is ignored, I would like to consider if
> throwing an UnsupportedOperation would be a better long therm solution.
>
> But at first glance I do not see any larger issues and it would great if
> you could make a pull request out of it.
>
> Piotrek
>
> On 9 Oct 2017, at 15:56, Antoine Philippot <an...@teads.tv>
> wrote:
>
> Thanks for your advices Piotr.
>
> Firstly, yes, we are aware that even with clean shutdown we can end up
> with duplicated messages after a crash and it is acceptable as is it rare
> and unintentional unlike deploying new business code or up/down scale.
>
> I made a fork of the 1.2.1 version which we currently use and developed a
> simple POC based on the solution to pass a boolean stopSourceSavepoint from
> the job manager to the source when a cancel with savepoint is triggered.
> This is the altered code :
> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint
>
> We test it with our production workload and there are no duplicated
> messages any more while hundred of thousands were duplicated before.
>
> I planned to reapply/adapt this patch for the 1.3.2 release when we
> migrate to it and maybe later to the 1.4
>
> I'm open to suggestion or to help/develop this feature upstream if you
> want.
>
>
> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com> a
> écrit :
>
>> We are planning to work on this clean shut down after releasing Flink
>> 1.4. Implementing this properly would require some work, for example:
>> - adding some checkpoint options to add information about
>> “closing”/“shutting down” event
>> - add clean shutdown to source functions API
>> - implement handling of this clean shutdown in desired sources
>>
>> Those are not super complicated changes but also not trivial.
>>
>> One thing that you could do, is to implement some super hacky filter
>> function just after source operator, that you would manually trigger.
>> Normally it would pass all of the messages. Once triggered, it would wait
>> for next checkpoint to happen. It would assume that it is a save point, and
>> would start filtering out all of the subsequent messages. When this
>> checkpoint completes, you could manually shutdown your Flink application.
>> This could guarantee that there are no duplicated writes after a restart.
>> This might work for clean shutdown, but it would be a very hacky solution.
>>
>> Btw, keep in mind that even with clean shutdown you can end up with
>> duplicated messages after a crash and there is no way around this with
>> Kafka 0.9.
>>
>> Piotrek
>>
>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <an...@teads.tv>
>> wrote:
>>
>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and
>> until a while).
>>
>> We can not afford tens of thousands of duplicated messages for each
>> application upgrade, can I help by working on this feature ?
>> Do you have any hint or details on this part of that "todo list" ?
>>
>>
>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com> a
>> écrit :
>>
>>> Hi,
>>>
>>> For failures recovery with Kafka 0.9 it is not possible to avoid
>>> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka
>>> 0.11 it will be possible to achieve exactly-once end to end semantic when
>>> writing to Kafka. However this still a work in progress:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-6988
>>>
>>> However this is a superset of functionality that you are asking for.
>>> Exactly-once just for clean shutdowns is also on our “TODO” list (it
>>> would/could support Kafka 0.9), but it is not currently being actively
>>> developed.
>>>
>>> Piotr Nowojski
>>>
>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <
>>> antoine.philippot@teads.tv> wrote:
>>>
>>> Hi,
>>>
>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case
>>> which handles around 100k messages per seconds.
>>>
>>> To upgrade our application we used to run a flink cancel with savepoint
>>> command followed by a flink run with the previous saved savepoint and the
>>> new application fat jar as parameter. We notice that we can have more than
>>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>>>
>>> This behaviour is actually problematic for this project and I try to
>>> find a solution / workaround to avoid these duplicated messages.
>>>
>>> The JobManager indicates clearly that the cancel call is triggered once
>>> the savepoint is finished, but during the savepoint execution, kafka source
>>> continue to poll new messages which will not be part of the savepoint and
>>> will be replayed on the next application start.
>>>
>>> I try to find a solution with the stop command line argument but the
>>> kafka source doesn't implement StoppableFunction (
>>> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
>>> generation is not available with stop in contrary to cancel.
>>>
>>> Is there an other solution to not process duplicated messages for each
>>> application upgrade or rescaling ?
>>>
>>> If no, has someone planned to implement it? Otherwise, I can propose a
>>> pull request after some architecture advices.
>>>
>>> The final goal is to stop polling source and trigger a savepoint once
>>> polling stopped.
>>>
>>> Thanks
>>>
>>>
>>>
>>
>

Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi, 

That’s good to hear :)

I quickly went through the code and it seems reasonable. I think there might be need to think a little bit more about how this cancel checkpoint should be exposed to the operators and what should be default action - right now by default cancel flag is ignored, I would like to consider if throwing an UnsupportedOperation would be a better long therm solution.

But at first glance I do not see any larger issues and it would great if you could make a pull request out of it.

Piotrek

> On 9 Oct 2017, at 15:56, Antoine Philippot <an...@teads.tv> wrote:
> 
> Thanks for your advices Piotr.
> 
> Firstly, yes, we are aware that even with clean shutdown we can end up with duplicated messages after a crash and it is acceptable as is it rare and unintentional unlike deploying new business code or up/down scale.
> 
> I made a fork of the 1.2.1 version which we currently use and developed a simple POC based on the solution to pass a boolean stopSourceSavepoint from the job manager to the source when a cancel with savepoint is triggered.
> This is the altered code : https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint <https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint>
> 
> We test it with our production workload and there are no duplicated messages any more while hundred of thousands were duplicated before.
> 
> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate to it and maybe later to the 1.4
> 
> I'm open to suggestion or to help/develop this feature upstream if you want.
> 
> 
> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
> We are planning to work on this clean shut down after releasing Flink 1.4. Implementing this properly would require some work, for example:
> - adding some checkpoint options to add information about “closing”/“shutting down” event
> - add clean shutdown to source functions API
> - implement handling of this clean shutdown in desired sources
> 
> Those are not super complicated changes but also not trivial.
> 
> One thing that you could do, is to implement some super hacky filter function just after source operator, that you would manually trigger. Normally it would pass all of the messages. Once triggered, it would wait for next checkpoint to happen. It would assume that it is a save point, and would start filtering out all of the subsequent messages. When this checkpoint completes, you could manually shutdown your Flink application. This could guarantee that there are no duplicated writes after a restart. This might work for clean shutdown, but it would be a very hacky solution. 
> 
> Btw, keep in mind that even with clean shutdown you can end up with duplicated messages after a crash and there is no way around this with Kafka 0.9.
> 
> Piotrek
> 
>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>> 
>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and until a while).
>> 
>> We can not afford tens of thousands of duplicated messages for each application upgrade, can I help by working on this feature ?
>> Do you have any hint or details on this part of that "todo list" ? 
>>  
>> 
>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
>> Hi,
>> 
>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be possible to achieve exactly-once end to end semantic when writing to Kafka. However this still a work in progress:
>> 
>> https://issues.apache.org/jira/browse/FLINK-6988 <https://issues.apache.org/jira/browse/FLINK-6988>
>> 
>> However this is a superset of functionality that you are asking for. Exactly-once just for clean shutdowns is also on our “TODO” list (it would/could support Kafka 0.9), but it is not currently being actively developed.
>> 
>> Piotr Nowojski
>> 
>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case which handles around 100k messages per seconds.
>>> 
>>> To upgrade our application we used to run a flink cancel with savepoint command followed by a flink run with the previous saved savepoint and the new application fat jar as parameter. We notice that we can have more than 50k of duplicated messages in the kafka sink wich is not idempotent.
>>> 
>>> This behaviour is actually problematic for this project and I try to find a solution / workaround to avoid these duplicated messages.
>>> 
>>> The JobManager indicates clearly that the cancel call is triggered once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start.
>>> 
>>> I try to find a solution with the stop command line argument but the kafka source doesn't implement StoppableFunction (https://issues.apache.org/jira/browse/FLINK-3404 <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint generation is not available with stop in contrary to cancel.
>>> 
>>> Is there an other solution to not process duplicated messages for each application upgrade or rescaling ?
>>> 
>>> If no, has someone planned to implement it? Otherwise, I can propose a pull request after some architecture advices.
>>> 
>>> The final goal is to stop polling source and trigger a savepoint once polling stopped.
>>> 
>>> Thanks
>> 
> 


Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Antoine Philippot <an...@teads.tv>.
Thanks for your advices Piotr.

Firstly, yes, we are aware that even with clean shutdown we can end up with
duplicated messages after a crash and it is acceptable as is it rare and
unintentional unlike deploying new business code or up/down scale.

I made a fork of the 1.2.1 version which we currently use and developed a
simple POC based on the solution to pass a boolean stopSourceSavepoint from
the job manager to the source when a cancel with savepoint is triggered.
This is the altered code :
https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint

We test it with our production workload and there are no duplicated
messages any more while hundred of thousands were duplicated before.

I planned to reapply/adapt this patch for the 1.3.2 release when we migrate
to it and maybe later to the 1.4

I'm open to suggestion or to help/develop this feature upstream if you want.


Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com> a
écrit :

> We are planning to work on this clean shut down after releasing Flink 1.4.
> Implementing this properly would require some work, for example:
> - adding some checkpoint options to add information about
> “closing”/“shutting down” event
> - add clean shutdown to source functions API
> - implement handling of this clean shutdown in desired sources
>
> Those are not super complicated changes but also not trivial.
>
> One thing that you could do, is to implement some super hacky filter
> function just after source operator, that you would manually trigger.
> Normally it would pass all of the messages. Once triggered, it would wait
> for next checkpoint to happen. It would assume that it is a save point, and
> would start filtering out all of the subsequent messages. When this
> checkpoint completes, you could manually shutdown your Flink application.
> This could guarantee that there are no duplicated writes after a restart.
> This might work for clean shutdown, but it would be a very hacky solution.
>
> Btw, keep in mind that even with clean shutdown you can end up with
> duplicated messages after a crash and there is no way around this with
> Kafka 0.9.
>
> Piotrek
>
> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <an...@teads.tv>
> wrote:
>
> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and
> until a while).
>
> We can not afford tens of thousands of duplicated messages for each
> application upgrade, can I help by working on this feature ?
> Do you have any hint or details on this part of that "todo list" ?
>
>
> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com> a
> écrit :
>
>> Hi,
>>
>> For failures recovery with Kafka 0.9 it is not possible to avoid
>> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka
>> 0.11 it will be possible to achieve exactly-once end to end semantic when
>> writing to Kafka. However this still a work in progress:
>>
>> https://issues.apache.org/jira/browse/FLINK-6988
>>
>> However this is a superset of functionality that you are asking for.
>> Exactly-once just for clean shutdowns is also on our “TODO” list (it
>> would/could support Kafka 0.9), but it is not currently being actively
>> developed.
>>
>> Piotr Nowojski
>>
>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <an...@teads.tv>
>> wrote:
>>
>> Hi,
>>
>> I'm working on a flink streaming app with a kafka09 to kafka09 use case
>> which handles around 100k messages per seconds.
>>
>> To upgrade our application we used to run a flink cancel with savepoint
>> command followed by a flink run with the previous saved savepoint and the
>> new application fat jar as parameter. We notice that we can have more than
>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>>
>> This behaviour is actually problematic for this project and I try to find
>> a solution / workaround to avoid these duplicated messages.
>>
>> The JobManager indicates clearly that the cancel call is triggered once
>> the savepoint is finished, but during the savepoint execution, kafka source
>> continue to poll new messages which will not be part of the savepoint and
>> will be replayed on the next application start.
>>
>> I try to find a solution with the stop command line argument but the
>> kafka source doesn't implement StoppableFunction (
>> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
>> generation is not available with stop in contrary to cancel.
>>
>> Is there an other solution to not process duplicated messages for each
>> application upgrade or rescaling ?
>>
>> If no, has someone planned to implement it? Otherwise, I can propose a
>> pull request after some architecture advices.
>>
>> The final goal is to stop polling source and trigger a savepoint once
>> polling stopped.
>>
>> Thanks
>>
>>
>>
>

Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Piotr Nowojski <pi...@data-artisans.com>.
We are planning to work on this clean shut down after releasing Flink 1.4. Implementing this properly would require some work, for example:
- adding some checkpoint options to add information about “closing”/“shutting down” event
- add clean shutdown to source functions API
- implement handling of this clean shutdown in desired sources

Those are not super complicated changes but also not trivial.

One thing that you could do, is to implement some super hacky filter function just after source operator, that you would manually trigger. Normally it would pass all of the messages. Once triggered, it would wait for next checkpoint to happen. It would assume that it is a save point, and would start filtering out all of the subsequent messages. When this checkpoint completes, you could manually shutdown your Flink application. This could guarantee that there are no duplicated writes after a restart. This might work for clean shutdown, but it would be a very hacky solution. 

Btw, keep in mind that even with clean shutdown you can end up with duplicated messages after a crash and there is no way around this with Kafka 0.9.

Piotrek

> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <an...@teads.tv> wrote:
> 
> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and until a while).
> 
> We can not afford tens of thousands of duplicated messages for each application upgrade, can I help by working on this feature ?
> Do you have any hint or details on this part of that "todo list" ? 
>  
> 
> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> a écrit :
> Hi,
> 
> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be possible to achieve exactly-once end to end semantic when writing to Kafka. However this still a work in progress:
> 
> https://issues.apache.org/jira/browse/FLINK-6988 <https://issues.apache.org/jira/browse/FLINK-6988>
> 
> However this is a superset of functionality that you are asking for. Exactly-once just for clean shutdowns is also on our “TODO” list (it would/could support Kafka 0.9), but it is not currently being actively developed.
> 
> Piotr Nowojski
> 
>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
>> 
>> Hi,
>> 
>> I'm working on a flink streaming app with a kafka09 to kafka09 use case which handles around 100k messages per seconds.
>> 
>> To upgrade our application we used to run a flink cancel with savepoint command followed by a flink run with the previous saved savepoint and the new application fat jar as parameter. We notice that we can have more than 50k of duplicated messages in the kafka sink wich is not idempotent.
>> 
>> This behaviour is actually problematic for this project and I try to find a solution / workaround to avoid these duplicated messages.
>> 
>> The JobManager indicates clearly that the cancel call is triggered once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start.
>> 
>> I try to find a solution with the stop command line argument but the kafka source doesn't implement StoppableFunction (https://issues.apache.org/jira/browse/FLINK-3404 <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint generation is not available with stop in contrary to cancel.
>> 
>> Is there an other solution to not process duplicated messages for each application upgrade or rescaling ?
>> 
>> If no, has someone planned to implement it? Otherwise, I can propose a pull request after some architecture advices.
>> 
>> The final goal is to stop polling source and trigger a savepoint once polling stopped.
>> 
>> Thanks
> 


Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Antoine Philippot <an...@teads.tv>.
Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and
until a while).

We can not afford tens of thousands of duplicated messages for each
application upgrade, can I help by working on this feature ?
Do you have any hint or details on this part of that "todo list" ?


Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com> a
écrit :

> Hi,
>
> For failures recovery with Kafka 0.9 it is not possible to avoid
> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka
> 0.11 it will be possible to achieve exactly-once end to end semantic when
> writing to Kafka. However this still a work in progress:
>
> https://issues.apache.org/jira/browse/FLINK-6988
>
> However this is a superset of functionality that you are asking for.
> Exactly-once just for clean shutdowns is also on our “TODO” list (it
> would/could support Kafka 0.9), but it is not currently being actively
> developed.
>
> Piotr Nowojski
>
> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <an...@teads.tv>
> wrote:
>
> Hi,
>
> I'm working on a flink streaming app with a kafka09 to kafka09 use case
> which handles around 100k messages per seconds.
>
> To upgrade our application we used to run a flink cancel with savepoint
> command followed by a flink run with the previous saved savepoint and the
> new application fat jar as parameter. We notice that we can have more than
> 50k of duplicated messages in the kafka sink wich is not idempotent.
>
> This behaviour is actually problematic for this project and I try to find
> a solution / workaround to avoid these duplicated messages.
>
> The JobManager indicates clearly that the cancel call is triggered once
> the savepoint is finished, but during the savepoint execution, kafka source
> continue to poll new messages which will not be part of the savepoint and
> will be replayed on the next application start.
>
> I try to find a solution with the stop command line argument but the kafka
> source doesn't implement StoppableFunction (
> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
> generation is not available with stop in contrary to cancel.
>
> Is there an other solution to not process duplicated messages for each
> application upgrade or rescaling ?
>
> If no, has someone planned to implement it? Otherwise, I can propose a
> pull request after some architecture advices.
>
> The final goal is to stop polling source and trigger a savepoint once
> polling stopped.
>
> Thanks
>
>
>

Re: Avoid duplicate messages while restarting a job for an application upgrade

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

For failures recovery with Kafka 0.9 it is not possible to avoid duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be possible to achieve exactly-once end to end semantic when writing to Kafka. However this still a work in progress:

https://issues.apache.org/jira/browse/FLINK-6988 <https://issues.apache.org/jira/browse/FLINK-6988>

However this is a superset of functionality that you are asking for. Exactly-once just for clean shutdowns is also on our “TODO” list (it would/could support Kafka 0.9), but it is not currently being actively developed.

Piotr Nowojski

> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <an...@teads.tv> wrote:
> 
> Hi,
> 
> I'm working on a flink streaming app with a kafka09 to kafka09 use case which handles around 100k messages per seconds.
> 
> To upgrade our application we used to run a flink cancel with savepoint command followed by a flink run with the previous saved savepoint and the new application fat jar as parameter. We notice that we can have more than 50k of duplicated messages in the kafka sink wich is not idempotent.
> 
> This behaviour is actually problematic for this project and I try to find a solution / workaround to avoid these duplicated messages.
> 
> The JobManager indicates clearly that the cancel call is triggered once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start.
> 
> I try to find a solution with the stop command line argument but the kafka source doesn't implement StoppableFunction (https://issues.apache.org/jira/browse/FLINK-3404 <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint generation is not available with stop in contrary to cancel.
> 
> Is there an other solution to not process duplicated messages for each application upgrade or rescaling ?
> 
> If no, has someone planned to implement it? Otherwise, I can propose a pull request after some architecture advices.
> 
> The final goal is to stop polling source and trigger a savepoint once polling stopped.
> 
> Thanks