You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2016/02/05 11:17:09 UTC

How to ensure exactly-once semantics in output to Kafka?

Hi,

It is my understanding that the exactly-once semantics regarding the input
from Kafka is based on the checkpointing in the source component retaining
the offset where it was at the checkpoint moment.

My question is how does that work for a sink? How can I make sure that (in
light of failures) each message that is read from Kafka (my input) is
written to Kafka (my output) exactly once?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Chesnay Schepler <ch...@apache.org>.
Essentially what happens is the following:

in between checkpoints all incoming data is stored within the operator 
state.

when a checkpoint-complete operation arrives, the data is read from the 
operator state and written into kafka (or any system)

if the job fails while storing records in the state, the current state 
is discarded and we go back to the previous one. since no data was 
written yet, we fulfill exactly-once here.
if the job fails while data is being written into cassandra (it can't be 
written as one atomic action) , some data will persist in cassandra, and 
will be send again upon restart. in this case exactly-once is not fulfilled.
But we minimize the time-frame in which a failure causes exactly-once to 
fail, which is pretty much as close as you can get without support from 
kafka or others.

@Niels we discussed having a counter that tells us how much data was 
written in between checkpoints. But this is currently not possible, an 
operator can't update his state on the fly, so we would need something 
new here.
And there would still be cases where even this would fail, for example 
if the job fails after the message was sent, but before the ID was saved.

On 05.02.2016 13:55, Paris Carbone wrote:
> This is not a bad take. It still makes a few assumptions
>
> 1) the output checkpoints the id of the last *known* ID that was 
> *persisted* in kafka (not just pushed)
> 2) we assume deterministic tuple order, as Stephan pointed out
>
>> On 05 Feb 2016, at 13:41, Niels Basjes <Niels@basjes.nl 
>> <ma...@basjes.nl>> wrote:
>>
>> Hi,
>>
>> Buffering the data (in all cases) would hurt the latency so much that 
>> Flink is effectively reverting to microbatching (where batch size is 
>> checkpoint period) with regards of the output.
>>
>> My initial thoughts on how to solve this was as follows:
>> 1) The output persists the ID of the last message it wrote to Kafka 
>> in the checkpoint.
>> 2) Upon recovery the sink would
>> 2a) Record the offset Kafka is at at that point in time
>> 2b) For all 'new' messages validate if it must write this message by 
>> reading from Kafka (starting at the offset in the checkpoint) and if 
>> the message is already present it would skip it.
>> 3) If a message arrives that has not yet written the message is 
>> written. Under the assumption that the messages arrive in the same 
>> order as before the sink can now simply run as normal.
>>
>> This way the performance is only impacted in the (short) period after 
>> the recovery of a disturbance.
>>
>> What do you think?
>>
>> Niels Basjes
>>
>>
>>
>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <sewen@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Hi Niels!
>>
>>     In general, exactly once output requires transactional
>>     cooperation from the target system. Kafka has that on the
>>     roadmap, we should be able to integrate that once it is out.
>>     That means output is "committed" upon completed checkpoints,
>>     which guarantees nothing is written multiple times.
>>
>>     Chesnay is working on an interesting prototype as a generic
>>     solution (also for Kafka, while they don't have that feature):
>>     It buffers the data in the sink persistently (using the fault
>>     tolerance state backends) and pushes the results out on
>>     notification of a completed checkpoint.
>>     That gives you exactly once semantics, but involves an extra
>>     materialization of the data.
>>
>>
>>     I think that there is actually a fundamental latency issue with
>>     "exactly once sinks", no matter how you implement them in any
>>     systems:
>>     You can only commit once you are sure that everything went well,
>>     to a specific point where you are sure no replay will ever be needed.
>>
>>     So the latency in Flink for an exactly-once output would be at
>>     least the checkpoint interval.
>>
>>     I'm eager to hear your thoughts on this.
>>
>>     Greetings,
>>     Stephan
>>
>>
>>     On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Niels@basjes.nl
>>     <ma...@basjes.nl>> wrote:
>>
>>         Hi,
>>
>>         It is my understanding that the exactly-once semantics
>>         regarding the input from Kafka is based on the checkpointing
>>         in the source component retaining the offset where it was at
>>         the checkpoint moment.
>>
>>         My question is how does that work for a sink? How can I make
>>         sure that (in light of failures) each message that is read
>>         from Kafka (my input) is written to Kafka (my output) exactly
>>         once?
>>
>>
>>         -- 
>>         Best regards / Met vriendelijke groeten,
>>
>>         Niels Basjes
>>
>>
>>
>>
>>
>> -- 
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>



On 05.02.2016 13:49, Paris Carbone wrote:
>  From what I understood state on sinks is included in the operator state of the sinks and pushed to kafka when 3-phase commit is complete.
> i.e. when the checkpoint completion notification arrives at the sinks.
>
> There are several pitfalls I am really curious to check and see how they are (going to be) handled, this is of course not as simple as it sounds. It really depends on the guarantees and operations the outside storage gives you. For example, how can we know that the pushed records are actually persisted in kafka in a single transaction? Not as simple as it sounds.
>
> @Chesnay can you tell us more?
>
>> On 05 Feb 2016, at 13:33, Paris Carbone <pa...@kth.se> wrote:
>>
>> That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details.
>>
>>> On 05 Feb 2016, at 13:32, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>
>>>> On 05 Feb 2016, at 13:28, Paris Carbone <pa...@kth.se> wrote:
>>>>
>>>> Hi Gabor,
>>>>
>>>> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario:
>>>>
>>>> 1) sink pushes to kafka record k and updates local buffer for k
>>>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>>>> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted
>>>> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer
>>>>
>>>> Chesnay’s approach seems to be valid and best effort for the time being.
>>> Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link?
>>>
>>> – Ufuk
>>>


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Paris Carbone <pa...@kth.se>.
From what I understood state on sinks is included in the operator state of the sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks. 

There are several pitfalls I am really curious to check and see how they are (going to be) handled, this is of course not as simple as it sounds. It really depends on the guarantees and operations the outside storage gives you. For example, how can we know that the pushed records are actually persisted in kafka in a single transaction? Not as simple as it sounds.

@Chesnay can you tell us more?

> On 05 Feb 2016, at 13:33, Paris Carbone <pa...@kth.se> wrote:
> 
> That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details.
> 
>> On 05 Feb 2016, at 13:32, Ufuk Celebi <uc...@apache.org> wrote:
>> 
>> 
>>> On 05 Feb 2016, at 13:28, Paris Carbone <pa...@kth.se> wrote:
>>> 
>>> Hi Gabor,
>>> 
>>> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario:
>>> 
>>> 1) sink pushes to kafka record k and updates local buffer for k
>>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>>> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted
>>> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer
>>> 
>>> Chesnay’s approach seems to be valid and best effort for the time being.
>> 
>> Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link?
>> 
>> – Ufuk
>> 
> 


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Paris Carbone <pa...@kth.se>.
That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details.

> On 05 Feb 2016, at 13:32, Ufuk Celebi <uc...@apache.org> wrote:
> 
> 
>> On 05 Feb 2016, at 13:28, Paris Carbone <pa...@kth.se> wrote:
>> 
>> Hi Gabor,
>> 
>> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario:
>> 
>> 1) sink pushes to kafka record k and updates local buffer for k
>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted
>> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer
>> 
>> Chesnay’s approach seems to be valid and best effort for the time being.
> 
> Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link?
> 
> – Ufuk
> 


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Ufuk Celebi <uc...@apache.org>.
> On 05 Feb 2016, at 13:28, Paris Carbone <pa...@kth.se> wrote:
> 
> Hi Gabor,
> 
> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario:
> 
> 1) sink pushes to kafka record k and updates local buffer for k
> 2) sink snapshots k and the rest of its state on checkpoint barrier
> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted
> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer
> 
> Chesnay’s approach seems to be valid and best effort for the time being.

Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link?

– Ufuk


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Paris Carbone <pa...@kth.se>.
Hi Gabor,

The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario:

1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer

Chesnay’s approach seems to be valid and best effort for the time being.

Paris

> On 05 Feb 2016, at 13:09, Gábor Gévay <gg...@gmail.com> wrote:
> 
> Hello,
> 
>> I think that there is actually a fundamental latency issue with
>> "exactly once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well,
>> to a specific point where you are sure no replay will ever be needed.
> 
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
> 
> Best,
> Gábor
> 
> 
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
>> Hi Niels!
>> 
>> In general, exactly once output requires transactional cooperation from the
>> target system. Kafka has that on the roadmap, we should be able to integrate
>> that once it is out.
>> That means output is "committed" upon completed checkpoints, which
>> guarantees nothing is written multiple times.
>> 
>> Chesnay is working on an interesting prototype as a generic solution (also
>> for Kafka, while they don't have that feature):
>> It buffers the data in the sink persistently (using the fault tolerance
>> state backends) and pushes the results out on notification of a completed
>> checkpoint.
>> That gives you exactly once semantics, but involves an extra materialization
>> of the data.
>> 
>> 
>> I think that there is actually a fundamental latency issue with "exactly
>> once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well, to a
>> specific point where you are sure no replay will ever be needed.
>> 
>> So the latency in Flink for an exactly-once output would be at least the
>> checkpoint interval.
>> 
>> I'm eager to hear your thoughts on this.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>> 
>>> Hi,
>>> 
>>> It is my understanding that the exactly-once semantics regarding the input
>>> from Kafka is based on the checkpointing in the source component retaining
>>> the offset where it was at the checkpoint moment.
>>> 
>>> My question is how does that work for a sink? How can I make sure that (in
>>> light of failures) each message that is read from Kafka (my input) is
>>> written to Kafka (my output) exactly once?
>>> 
>>> 
>>> --
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>> 
>> 


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Stephan Ewen <se...@apache.org>.
@Gabor: That assumes deterministic streams and to some extend deterministic
tuple order.
That may be given sometimes, but it is a very strong assumption in many
cases.

On Fri, Feb 5, 2016 at 1:09 PM, Gábor Gévay <gg...@gmail.com> wrote:

> Hello,
>
> > I think that there is actually a fundamental latency issue with
> > "exactly once sinks", no matter how you implement them in any systems:
> > You can only commit once you are sure that everything went well,
> > to a specific point where you are sure no replay will ever be needed.
>
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
>
> Best,
> Gábor
>
>
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > Hi Niels!
> >
> > In general, exactly once output requires transactional cooperation from
> the
> > target system. Kafka has that on the roadmap, we should be able to
> integrate
> > that once it is out.
> > That means output is "committed" upon completed checkpoints, which
> > guarantees nothing is written multiple times.
> >
> > Chesnay is working on an interesting prototype as a generic solution
> (also
> > for Kafka, while they don't have that feature):
> > It buffers the data in the sink persistently (using the fault tolerance
> > state backends) and pushes the results out on notification of a completed
> > checkpoint.
> > That gives you exactly once semantics, but involves an extra
> materialization
> > of the data.
> >
> >
> > I think that there is actually a fundamental latency issue with "exactly
> > once sinks", no matter how you implement them in any systems:
> > You can only commit once you are sure that everything went well, to a
> > specific point where you are sure no replay will ever be needed.
> >
> > So the latency in Flink for an exactly-once output would be at least the
> > checkpoint interval.
> >
> > I'm eager to hear your thoughts on this.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> >>
> >> Hi,
> >>
> >> It is my understanding that the exactly-once semantics regarding the
> input
> >> from Kafka is based on the checkpointing in the source component
> retaining
> >> the offset where it was at the checkpoint moment.
> >>
> >> My question is how does that work for a sink? How can I make sure that
> (in
> >> light of failures) each message that is read from Kafka (my input) is
> >> written to Kafka (my output) exactly once?
> >>
> >>
> >> --
> >> Best regards / Met vriendelijke groeten,
> >>
> >> Niels Basjes
> >
> >
>

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Gábor Gévay <gg...@gmail.com>.
Hello,

> I think that there is actually a fundamental latency issue with
> "exactly once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well,
> to a specific point where you are sure no replay will ever be needed.

What if the persistent buffer in the sink would be used to determine
which data elements should be emitted in case of a replay? I mean, the
sink pushes everything as soon as it arrives, and also writes
everything to the persistent buffer, and then in case of a replay it
looks into the buffer before pushing every element, and only does the
push if the buffer says that the element was not pushed before.

Best,
Gábor


2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
> Hi Niels!
>
> In general, exactly once output requires transactional cooperation from the
> target system. Kafka has that on the roadmap, we should be able to integrate
> that once it is out.
> That means output is "committed" upon completed checkpoints, which
> guarantees nothing is written multiple times.
>
> Chesnay is working on an interesting prototype as a generic solution (also
> for Kafka, while they don't have that feature):
> It buffers the data in the sink persistently (using the fault tolerance
> state backends) and pushes the results out on notification of a completed
> checkpoint.
> That gives you exactly once semantics, but involves an extra materialization
> of the data.
>
>
> I think that there is actually a fundamental latency issue with "exactly
> once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well, to a
> specific point where you are sure no replay will ever be needed.
>
> So the latency in Flink for an exactly-once output would be at least the
> checkpoint interval.
>
> I'm eager to hear your thoughts on this.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>> Hi,
>>
>> It is my understanding that the exactly-once semantics regarding the input
>> from Kafka is based on the checkpointing in the source component retaining
>> the offset where it was at the checkpoint moment.
>>
>> My question is how does that work for a sink? How can I make sure that (in
>> light of failures) each message that is read from Kafka (my input) is
>> written to Kafka (my output) exactly once?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>
>

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Stephan Ewen <se...@apache.org>.
Hi Niels!

That could actually work, given a way to identify messages with a unique ID.

Would be quite an exercise to implement...

Stephan


On Fri, Feb 5, 2016 at 2:14 PM, Niels Basjes <Ni...@basjes.nl> wrote:

> @Stephan;
> Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
> So my idea is based on the fact that Kafka has all the messages and that I
> can read those messages from Kafka to validate if I should or should not
> write them again.
>
> Let me illustrate what I had in mind:
> I write messages to Kafka and at the moment of the checkpoint the last
> message ID I wrote is 5.
> Then I write 6,7,8
> FAIL
> Recover:
> Open a reader starting at message 5
> Get message 6 -> Read from Kafka --> Already have this --> Skip
> Get message 7 -> Read from Kafka --> Already have this --> Skip
> Get message 8 -> Read from Kafka --> Already have this --> Skip
> Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
> normal operations.
>
> Like I said: This is just the first rough idea I had on a possible
> direction how this can be solved without the latency impact of buffering.
>
> Niels Basjes
>
>
> On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> @Niels: I don't fully understand your approach so far.
>>
>> If you write a message to Kafka between two checkpoints, where do you
>> store the information that this particular message is already written (I
>> think this would be the ID in your example).
>> Such an information would need to be persisted for every written messages
>> (or very small group of messages).
>>
>> Stephan
>>
>>
>> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> Buffering the data (in all cases) would hurt the latency so much that
>>> Flink is effectively reverting to microbatching (where batch size is
>>> checkpoint period) with regards of the output.
>>>
>>> My initial thoughts on how to solve this was as follows:
>>> 1) The output persists the ID of the last message it wrote to Kafka in
>>> the checkpoint.
>>> 2) Upon recovery the sink would
>>> 2a) Record the offset Kafka is at at that point in time
>>> 2b) For all 'new' messages validate if it must write this message by
>>> reading from Kafka (starting at the offset in the checkpoint) and if the
>>> message is already present it would skip it.
>>> 3) If a message arrives that has not yet written the message is written.
>>> Under the assumption that the messages arrive in the same order as before
>>> the sink can now simply run as normal.
>>>
>>> This way the performance is only impacted in the (short) period after
>>> the recovery of a disturbance.
>>>
>>> What do you think?
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi Niels!
>>>>
>>>> In general, exactly once output requires transactional cooperation from
>>>> the target system. Kafka has that on the roadmap, we should be able to
>>>> integrate that once it is out.
>>>> That means output is "committed" upon completed checkpoints, which
>>>> guarantees nothing is written multiple times.
>>>>
>>>> Chesnay is working on an interesting prototype as a generic solution
>>>> (also for Kafka, while they don't have that feature):
>>>> It buffers the data in the sink persistently (using the fault tolerance
>>>> state backends) and pushes the results out on notification of a completed
>>>> checkpoint.
>>>> That gives you exactly once semantics, but involves an extra
>>>> materialization of the data.
>>>>
>>>>
>>>> I think that there is actually a fundamental latency issue with
>>>> "exactly once sinks", no matter how you implement them in any systems:
>>>> You can only commit once you are sure that everything went well, to a
>>>> specific point where you are sure no replay will ever be needed.
>>>>
>>>> So the latency in Flink for an exactly-once output would be at least
>>>> the checkpoint interval.
>>>>
>>>> I'm eager to hear your thoughts on this.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It is my understanding that the exactly-once semantics regarding the
>>>>> input from Kafka is based on the checkpointing in the source component
>>>>> retaining the offset where it was at the checkpoint moment.
>>>>>
>>>>> My question is how does that work for a sink? How can I make sure that
>>>>> (in light of failures) each message that is read from Kafka (my input) is
>>>>> written to Kafka (my output) exactly once?
>>>>>
>>>>>
>>>>> --
>>>>> Best regards / Met vriendelijke groeten,
>>>>>
>>>>> Niels Basjes
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Niels Basjes <Ni...@basjes.nl>.
@Stephan;
Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
So my idea is based on the fact that Kafka has all the messages and that I
can read those messages from Kafka to validate if I should or should not
write them again.

Let me illustrate what I had in mind:
I write messages to Kafka and at the moment of the checkpoint the last
message ID I wrote is 5.
Then I write 6,7,8
FAIL
Recover:
Open a reader starting at message 5
Get message 6 -> Read from Kafka --> Already have this --> Skip
Get message 7 -> Read from Kafka --> Already have this --> Skip
Get message 8 -> Read from Kafka --> Already have this --> Skip
Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
normal operations.

Like I said: This is just the first rough idea I had on a possible
direction how this can be solved without the latency impact of buffering.

Niels Basjes


On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:

> @Niels: I don't fully understand your approach so far.
>
> If you write a message to Kafka between two checkpoints, where do you
> store the information that this particular message is already written (I
> think this would be the ID in your example).
> Such an information would need to be persisted for every written messages
> (or very small group of messages).
>
> Stephan
>
>
> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> Buffering the data (in all cases) would hurt the latency so much that
>> Flink is effectively reverting to microbatching (where batch size is
>> checkpoint period) with regards of the output.
>>
>> My initial thoughts on how to solve this was as follows:
>> 1) The output persists the ID of the last message it wrote to Kafka in
>> the checkpoint.
>> 2) Upon recovery the sink would
>> 2a) Record the offset Kafka is at at that point in time
>> 2b) For all 'new' messages validate if it must write this message by
>> reading from Kafka (starting at the offset in the checkpoint) and if the
>> message is already present it would skip it.
>> 3) If a message arrives that has not yet written the message is written.
>> Under the assumption that the messages arrive in the same order as before
>> the sink can now simply run as normal.
>>
>> This way the performance is only impacted in the (short) period after the
>> recovery of a disturbance.
>>
>> What do you think?
>>
>> Niels Basjes
>>
>>
>>
>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> In general, exactly once output requires transactional cooperation from
>>> the target system. Kafka has that on the roadmap, we should be able to
>>> integrate that once it is out.
>>> That means output is "committed" upon completed checkpoints, which
>>> guarantees nothing is written multiple times.
>>>
>>> Chesnay is working on an interesting prototype as a generic solution
>>> (also for Kafka, while they don't have that feature):
>>> It buffers the data in the sink persistently (using the fault tolerance
>>> state backends) and pushes the results out on notification of a completed
>>> checkpoint.
>>> That gives you exactly once semantics, but involves an extra
>>> materialization of the data.
>>>
>>>
>>> I think that there is actually a fundamental latency issue with "exactly
>>> once sinks", no matter how you implement them in any systems:
>>> You can only commit once you are sure that everything went well, to a
>>> specific point where you are sure no replay will ever be needed.
>>>
>>> So the latency in Flink for an exactly-once output would be at least the
>>> checkpoint interval.
>>>
>>> I'm eager to hear your thoughts on this.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> It is my understanding that the exactly-once semantics regarding the
>>>> input from Kafka is based on the checkpointing in the source component
>>>> retaining the offset where it was at the checkpoint moment.
>>>>
>>>> My question is how does that work for a sink? How can I make sure that
>>>> (in light of failures) each message that is read from Kafka (my input) is
>>>> written to Kafka (my output) exactly once?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Stephan Ewen <se...@apache.org>.
@Niels: I don't fully understand your approach so far.

If you write a message to Kafka between two checkpoints, where do you store
the information that this particular message is already written (I think
this would be the ID in your example).
Such an information would need to be persisted for every written messages
(or very small group of messages).

Stephan


On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> Buffering the data (in all cases) would hurt the latency so much that
> Flink is effectively reverting to microbatching (where batch size is
> checkpoint period) with regards of the output.
>
> My initial thoughts on how to solve this was as follows:
> 1) The output persists the ID of the last message it wrote to Kafka in the
> checkpoint.
> 2) Upon recovery the sink would
> 2a) Record the offset Kafka is at at that point in time
> 2b) For all 'new' messages validate if it must write this message by
> reading from Kafka (starting at the offset in the checkpoint) and if the
> message is already present it would skip it.
> 3) If a message arrives that has not yet written the message is written.
> Under the assumption that the messages arrive in the same order as before
> the sink can now simply run as normal.
>
> This way the performance is only impacted in the (short) period after the
> recovery of a disturbance.
>
> What do you think?
>
> Niels Basjes
>
>
>
> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Niels!
>>
>> In general, exactly once output requires transactional cooperation from
>> the target system. Kafka has that on the roadmap, we should be able to
>> integrate that once it is out.
>> That means output is "committed" upon completed checkpoints, which
>> guarantees nothing is written multiple times.
>>
>> Chesnay is working on an interesting prototype as a generic solution
>> (also for Kafka, while they don't have that feature):
>> It buffers the data in the sink persistently (using the fault tolerance
>> state backends) and pushes the results out on notification of a completed
>> checkpoint.
>> That gives you exactly once semantics, but involves an extra
>> materialization of the data.
>>
>>
>> I think that there is actually a fundamental latency issue with "exactly
>> once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well, to a
>> specific point where you are sure no replay will ever be needed.
>>
>> So the latency in Flink for an exactly-once output would be at least the
>> checkpoint interval.
>>
>> I'm eager to hear your thoughts on this.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> It is my understanding that the exactly-once semantics regarding the
>>> input from Kafka is based on the checkpointing in the source component
>>> retaining the offset where it was at the checkpoint moment.
>>>
>>> My question is how does that work for a sink? How can I make sure that
>>> (in light of failures) each message that is read from Kafka (my input) is
>>> written to Kafka (my output) exactly once?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Gábor Gévay <gg...@gmail.com>.
The way I imagine this is that the sink would have its "own
checkpoints" separately from the rest of the system, and with much
smaller interval, and writes to Kafka (with "transactional
cooperation", as Stephan mentioned) during making these checkpoints.
And then when a replay happens from a global system checkpoint, it can
look at its own checkpoints to decide for each tuple whether to send
it or not.

@Stephan:

> That assumes deterministic streams and to some extend deterministic tuple order.
> That may be given sometimes, but it is a very strong assumption in many cases.

Ah yes, you are right. But doing everything based on event time points
in this direction of deterministic streams, right?

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Paris Carbone <pa...@kth.se>.
This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was *persisted* in kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes <Ni...@basjes.nl>> wrote:

Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink is effectively reverting to microbatching (where batch size is checkpoint period) with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by reading from Kafka (starting at the offset in the checkpoint) and if the message is already present it would skip it.
3) If a message arrives that has not yet written the message is written. Under the assumption that the messages arrive in the same order as before the sink can now simply run as normal.

This way the performance is only impacted in the (short) period after the recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org>> wrote:
Hi Niels!

In general, exactly once output requires transactional cooperation from the target system. Kafka has that on the roadmap, we should be able to integrate that once it is out.
That means output is "committed" upon completed checkpoints, which guarantees nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic solution (also for Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault tolerance state backends) and pushes the results out on notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra materialization of the data.


I think that there is actually a fundamental latency issue with "exactly once sinks", no matter how you implement them in any systems:
You can only commit once you are sure that everything went well, to a specific point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at least the checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl>> wrote:
Hi,

It is my understanding that the exactly-once semantics regarding the input from Kafka is based on the checkpointing in the source component retaining the offset where it was at the checkpoint moment.

My question is how does that work for a sink? How can I make sure that (in light of failures) each message that is read from Kafka (my input) is written to Kafka (my output) exactly once?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink
is effectively reverting to microbatching (where batch size is checkpoint
period) with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the
checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by
reading from Kafka (starting at the offset in the checkpoint) and if the
message is already present it would skip it.
3) If a message arrives that has not yet written the message is written.
Under the assumption that the messages arrive in the same order as before
the sink can now simply run as normal.

This way the performance is only impacted in the (short) period after the
recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Niels!
>
> In general, exactly once output requires transactional cooperation from
> the target system. Kafka has that on the roadmap, we should be able to
> integrate that once it is out.
> That means output is "committed" upon completed checkpoints, which
> guarantees nothing is written multiple times.
>
> Chesnay is working on an interesting prototype as a generic solution (also
> for Kafka, while they don't have that feature):
> It buffers the data in the sink persistently (using the fault tolerance
> state backends) and pushes the results out on notification of a completed
> checkpoint.
> That gives you exactly once semantics, but involves an extra
> materialization of the data.
>
>
> I think that there is actually a fundamental latency issue with "exactly
> once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well, to a
> specific point where you are sure no replay will ever be needed.
>
> So the latency in Flink for an exactly-once output would be at least the
> checkpoint interval.
>
> I'm eager to hear your thoughts on this.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> It is my understanding that the exactly-once semantics regarding the
>> input from Kafka is based on the checkpointing in the source component
>> retaining the offset where it was at the checkpoint moment.
>>
>> My question is how does that work for a sink? How can I make sure that
>> (in light of failures) each message that is read from Kafka (my input) is
>> written to Kafka (my output) exactly once?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: How to ensure exactly-once semantics in output to Kafka?

Posted by Stephan Ewen <se...@apache.org>.
Hi Niels!

In general, exactly once output requires transactional cooperation from the
target system. Kafka has that on the roadmap, we should be able to
integrate that once it is out.
That means output is "committed" upon completed checkpoints, which
guarantees nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic solution (also
for Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault tolerance
state backends) and pushes the results out on notification of a completed
checkpoint.
That gives you exactly once semantics, but involves an extra
materialization of the data.


I think that there is actually a fundamental latency issue with "exactly
once sinks", no matter how you implement them in any systems:
You can only commit once you are sure that everything went well, to a
specific point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at least the
checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> It is my understanding that the exactly-once semantics regarding the input
> from Kafka is based on the checkpointing in the source component retaining
> the offset where it was at the checkpoint moment.
>
> My question is how does that work for a sink? How can I make sure that (in
> light of failures) each message that is read from Kafka (my input) is
> written to Kafka (my output) exactly once?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>