You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eleanore Jin <el...@gmail.com> on 2020/06/16 15:16:29 UTC

KafkaIO Exactly once vs At least Once

Hi All,

I previously asked a few questions regarding enable EOS (exactly once
semantics) please see below.

Our Beam pipeline uses KafkaIO to read from source topic, and then use
KafkaIO to publish to sink topic.

According to Max's answer to my previous questions, enable EOS with KafkaIO
will introduce latency,
as only after checkpoints of all message within the checkpoint interval,
then the KakfaIO.ExactlyOnceWriter
processElement method will be called. So the latency depends on the
checkpoint interval.

I just wonder if I relax to At Least Once, do I still need to enable EOS on
KafkaIO? Or it is not required?
If not, can you please provide some instruction how should it be done?

Thanks a lot!
Eleanore

> Thanks for the response! the reason to setup the state backend is to
> experiment Kafka EOS with Beam running on Flink.  Reading through the
> code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> you please help me clarify my understanding?
>
> 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> EOS, ExactlyOnceWriter processElement method is annotated
> with @RequiresStableInput, so all the messages will be cached
> by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> messages will be processed by ExactlyOnceWriter?

That's correct.

>
> 2. Upon checkpoint, will those messages cached by
> KeyedBufferingEleementsHandler also checkpointed?

Yes, the buffered elements will be checkpointed.

> 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> stream processing, the delay is based on the checkpoint interval? How to
> reduce the latency while still have EOS guarantee?

Indeed, the checkpoint interval and the checkpoint duration limits the
latency. Given the current design and the guarantees, there is no other
way to influence the latency.

> 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> checkpoint successfully, the checkpointed offset will be committed back
> to kafka, but if this operation does not finish successfully, and then
> the job gets cancelled/stopped, and re-submit the job again (with the
> same consumer group for source topics, but different jobID), then it is
> possible duplicated processing still exists? because the consumed offset
> is not committed back to kafka?

This option is for the Kafka consumer. AFAIK this is just a convenience
method to commit the latest checkpointed offset to Kafka. This offset is
not used when restoring from a checkpoint. However, if you don't restore
from a checkpoint, you can resume from that offset which might be
convenient or not, depending on your use case.

Re: KafkaIO Exactly once vs At least Once

Posted by Eleanore Jin <el...@gmail.com>.
Hi Alex,

You previously suggested for KafkaIO exactly once, I do not need to
configure KafkaIO.write().withEOS(), only need to add producer property
enable.idempotence=true, however from the Code:
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1311,
only withEOS(), then it will use KafkaExactlyOnceSink, otherwise it will
use KafkaWriter.

[image: image.png]

Did I misunderstand something? Or does it require EOS() for end-to-end
exactly once with KafkaIO as source and sink?

Thanks a lot!
Eleanore

On Wed, Jun 24, 2020 at 12:48 PM Eleanore Jin <el...@gmail.com>
wrote:

>
> Hi Alex,
>
> Thanks a lot for the info.
>
> Eleanore
>
> On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Well, I think, in general, it will be a question of trade-off between
>> latency and performance in case of EOS sink (since EOS can’t be "for
>> free").
>>
>> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or
>> others with more Flink knowledge can do), but I’d just try different
>> numbers to see how it it will affect the results.
>>
>> On 23 Jun 2020, at 23:27, Eleanore Jin <el...@gmail.com> wrote:
>>
>> Hi Alexey,
>>
>> Thanks a lot for the information! I will give it a try.
>>
>> Regarding the checkpoint intervals, I think the Flink community suggested
>> something between 3-5 minutes, I am not sure yet if the checkpoint interval
>> can be in milliseconds? Currently, our beam pipeline is stateless, there is
>> no other operator state or user defined state.
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>>
>>> On 23 Jun 2020, at 07:49, Eleanore Jin <el...@gmail.com> wrote:
>>>
>>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
>>> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
>>> "mySlinkGroupId"), reading from your response, do I need additionally
>>> configure KafkaProducer property enable.idempotence=true, or I only need to
>>> configure this property?
>>>
>>>
>>> No, you don’t need to do that. New KafkaProducer will be created with
>>> this option set in KafkaExactlyOnceSink [1].
>>>
>>> So can you please correct me if the above settings is not the optimal
>>> and if there is anyway to reduce the latency by introducing checkpointing
>>> for EOS?
>>>
>>>
>>> Your settings look fine for me. You probably could play with checkpoint
>>> intervals (why it’s 10 secs?) to reduce a latency.
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>>
>>>
>>>
>>>
>>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> I think you don’t need to enable EOS in this case since KafkaIO has a
>>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>>>> all runners) and it relies on setting “enable.idempotence=true” for
>>>> KafkaProducer.
>>>> I’m not sure that you can achieve “at least once” semantics with
>>>> current KafkaIO implementation.
>>>>
>>>> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I previously asked a few questions regarding enable EOS (exactly once
>>>> semantics) please see below.
>>>>
>>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>>>> KafkaIO to publish to sink topic.
>>>>
>>>> According to Max's answer to my previous questions, enable EOS with
>>>> KafkaIO will introduce latency,
>>>> as only after checkpoints of all message within the checkpoint
>>>> interval, then the KakfaIO.ExactlyOnceWriter
>>>> processElement method will be called. So the latency depends on the
>>>> checkpoint interval.
>>>>
>>>> I just wonder if I relax to At Least Once, do I still need to enable
>>>> EOS on KafkaIO? Or it is not required?
>>>> If not, can you please provide some instruction how should it be done?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> > Thanks for the response! the reason to setup the state backend is to
>>>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>>>> > code and this PR <https://github.com/apache/beam/pull/7991/files>,
>>>> can
>>>> > you please help me clarify my understanding?
>>>> >
>>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>>>> > EOS, ExactlyOnceWriter processElement method is annotated
>>>> > with @RequiresStableInput, so all the messages will be cached
>>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds,
>>>> those
>>>> > messages will be processed by ExactlyOnceWriter?
>>>>
>>>> That's correct.
>>>>
>>>> >
>>>> > 2. Upon checkpoint, will those messages cached by
>>>> > KeyedBufferingEleementsHandler also checkpointed?
>>>>
>>>> Yes, the buffered elements will be checkpointed.
>>>>
>>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in
>>>> the
>>>> > stream processing, the delay is based on the checkpoint interval? How
>>>> to
>>>> > reduce the latency while still have EOS guarantee?
>>>>
>>>> Indeed, the checkpoint interval and the checkpoint duration limits the
>>>> latency. Given the current design and the guarantees, there is no other
>>>> way to influence the latency.
>>>>
>>>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>>>> > checkpoint successfully, the checkpointed offset will be committed
>>>> back
>>>> > to kafka, but if this operation does not finish successfully, and then
>>>> > the job gets cancelled/stopped, and re-submit the job again (with the
>>>> > same consumer group for source topics, but different jobID), then it
>>>> is
>>>> > possible duplicated processing still exists? because the consumed
>>>> offset
>>>> > is not committed back to kafka?
>>>>
>>>> This option is for the Kafka consumer. AFAIK this is just a convenience
>>>> method to commit the latest checkpointed offset to Kafka. This offset is
>>>> not used when restoring from a checkpoint. However, if you don't restore
>>>> from a checkpoint, you can resume from that offset which might be
>>>> convenient or not, depending on your use case.
>>>>
>>>>
>>>>
>>>
>>

Re: KafkaIO Exactly once vs At least Once

Posted by Eleanore Jin <el...@gmail.com>.
Hi Alex,

Thanks a lot for the info.

Eleanore

On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Well, I think, in general, it will be a question of trade-off between
> latency and performance in case of EOS sink (since EOS can’t be "for
> free").
>
> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or
> others with more Flink knowledge can do), but I’d just try different
> numbers to see how it it will affect the results.
>
> On 23 Jun 2020, at 23:27, Eleanore Jin <el...@gmail.com> wrote:
>
> Hi Alexey,
>
> Thanks a lot for the information! I will give it a try.
>
> Regarding the checkpoint intervals, I think the Flink community suggested
> something between 3-5 minutes, I am not sure yet if the checkpoint interval
> can be in milliseconds? Currently, our beam pipeline is stateless, there is
> no other operator state or user defined state.
>
> Thanks a lot!
> Eleanore
>
> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>>
>> On 23 Jun 2020, at 07:49, Eleanore Jin <el...@gmail.com> wrote:
>>
>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
>> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
>> "mySlinkGroupId"), reading from your response, do I need additionally
>> configure KafkaProducer property enable.idempotence=true, or I only need to
>> configure this property?
>>
>>
>> No, you don’t need to do that. New KafkaProducer will be created with
>> this option set in KafkaExactlyOnceSink [1].
>>
>> So can you please correct me if the above settings is not the optimal and
>> if there is anyway to reduce the latency by introducing checkpointing for
>> EOS?
>>
>>
>> Your settings look fine for me. You probably could play with checkpoint
>> intervals (why it’s 10 secs?) to reduce a latency.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>
>>
>>
>>
>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> I think you don’t need to enable EOS in this case since KafkaIO has a
>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>>> all runners) and it relies on setting “enable.idempotence=true” for
>>> KafkaProducer.
>>> I’m not sure that you can achieve “at least once” semantics with current
>>> KafkaIO implementation.
>>>
>>> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
>>>
>>> Hi All,
>>>
>>> I previously asked a few questions regarding enable EOS (exactly once
>>> semantics) please see below.
>>>
>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>>> KafkaIO to publish to sink topic.
>>>
>>> According to Max's answer to my previous questions, enable EOS with
>>> KafkaIO will introduce latency,
>>> as only after checkpoints of all message within the checkpoint interval,
>>> then the KakfaIO.ExactlyOnceWriter
>>> processElement method will be called. So the latency depends on the
>>> checkpoint interval.
>>>
>>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>>> on KafkaIO? Or it is not required?
>>> If not, can you please provide some instruction how should it be done?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> > Thanks for the response! the reason to setup the state backend is to
>>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
>>> > you please help me clarify my understanding?
>>> >
>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>>> > EOS, ExactlyOnceWriter processElement method is annotated
>>> > with @RequiresStableInput, so all the messages will be cached
>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>>> > messages will be processed by ExactlyOnceWriter?
>>>
>>> That's correct.
>>>
>>> >
>>> > 2. Upon checkpoint, will those messages cached by
>>> > KeyedBufferingEleementsHandler also checkpointed?
>>>
>>> Yes, the buffered elements will be checkpointed.
>>>
>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in
>>> the
>>> > stream processing, the delay is based on the checkpoint interval? How
>>> to
>>> > reduce the latency while still have EOS guarantee?
>>>
>>> Indeed, the checkpoint interval and the checkpoint duration limits the
>>> latency. Given the current design and the guarantees, there is no other
>>> way to influence the latency.
>>>
>>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>>> > checkpoint successfully, the checkpointed offset will be committed back
>>> > to kafka, but if this operation does not finish successfully, and then
>>> > the job gets cancelled/stopped, and re-submit the job again (with the
>>> > same consumer group for source topics, but different jobID), then it is
>>> > possible duplicated processing still exists? because the consumed
>>> offset
>>> > is not committed back to kafka?
>>>
>>> This option is for the Kafka consumer. AFAIK this is just a convenience
>>> method to commit the latest checkpointed offset to Kafka. This offset is
>>> not used when restoring from a checkpoint. However, if you don't restore
>>> from a checkpoint, you can resume from that offset which might be
>>> convenient or not, depending on your use case.
>>>
>>>
>>>
>>
>

Re: KafkaIO Exactly once vs At least Once

Posted by Alexey Romanenko <ar...@gmail.com>.
Well, I think, in general, it will be a question of trade-off between latency and performance in case of EOS sink (since EOS can’t be "for free"). 

I can’t recommend specific numbers for Flink (maybe Maximilian Michels or others with more Flink knowledge can do), but I’d just try different numbers to see how it it will affect the results. 

> On 23 Jun 2020, at 23:27, Eleanore Jin <el...@gmail.com> wrote:
> 
> Hi Alexey, 
> 
> Thanks a lot for the information! I will give it a try.
> 
> Regarding the checkpoint intervals, I think the Flink community suggested something between 3-5 minutes, I am not sure yet if the checkpoint interval can be in milliseconds? Currently, our beam pipeline is stateless, there is no other operator state or user defined state. 
> 
> Thanks a lot!
> Eleanore
> 
> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> 
>> On 23 Jun 2020, at 07:49, Eleanore Jin <eleanore.jin@gmail.com <ma...@gmail.com>> wrote:
>> 
>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic, "mySlinkGroupId"), reading from your response, do I need additionally configure KafkaProducer property enable.idempotence=true, or I only need to configure this property?
> 
> No, you don’t need to do that. New KafkaProducer will be created with this option set in KafkaExactlyOnceSink [1].
> 
>> So can you please correct me if the above settings is not the optimal and if there is anyway to reduce the latency by introducing checkpointing for EOS?
> 
> Your settings look fine for me. You probably could play with checkpoint intervals (why it’s 10 secs?) to reduce a latency.
> 
> 
> [1] https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711 <https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711>
> 
> 
>> 
>> 
>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> I think you don’t need to enable EOS in this case since KafkaIO has a dedicated EOS-sink implementation for Beam part (btw, it’s not supported by all runners) and it relies on setting “enable.idempotence=true” for KafkaProducer.
>> I’m not sure that you can achieve “at least once” semantics with current KafkaIO implementation.
>> 
>>> On 16 Jun 2020, at 17:16, Eleanore Jin <eleanore.jin@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi All, 
>>> 
>>> I previously asked a few questions regarding enable EOS (exactly once semantics) please see below.
>>> 
>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use KafkaIO to publish to sink topic.
>>> 
>>> According to Max's answer to my previous questions, enable EOS with KafkaIO will introduce latency, 
>>> as only after checkpoints of all message within the checkpoint interval, then the KakfaIO.ExactlyOnceWriter
>>> processElement method will be called. So the latency depends on the checkpoint interval.
>>> 
>>> I just wonder if I relax to At Least Once, do I still need to enable EOS on KafkaIO? Or it is not required?
>>> If not, can you please provide some instruction how should it be done?
>>> 
>>> Thanks a lot!
>>> Eleanore
>>> 
>>> > Thanks for the response! the reason to setup the state backend is to
>>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>>> > code and this PR <https://github.com/apache/beam/pull/7991/files <https://github.com/apache/beam/pull/7991/files>>, can
>>> > you please help me clarify my understanding?
>>> >
>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>>> > EOS, ExactlyOnceWriter processElement method is annotated
>>> > with @RequiresStableInput, so all the messages will be cached
>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>>> > messages will be processed by ExactlyOnceWriter?
>>> 
>>> That's correct.
>>> 
>>> >
>>> > 2. Upon checkpoint, will those messages cached by
>>> > KeyedBufferingEleementsHandler also checkpointed?
>>> 
>>> Yes, the buffered elements will be checkpointed.
>>> 
>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>>> > stream processing, the delay is based on the checkpoint interval? How to
>>> > reduce the latency while still have EOS guarantee?
>>> 
>>> Indeed, the checkpoint interval and the checkpoint duration limits the
>>> latency. Given the current design and the guarantees, there is no other
>>> way to influence the latency.
>>> 
>>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>>> > checkpoint successfully, the checkpointed offset will be committed back
>>> > to kafka, but if this operation does not finish successfully, and then
>>> > the job gets cancelled/stopped, and re-submit the job again (with the
>>> > same consumer group for source topics, but different jobID), then it is
>>> > possible duplicated processing still exists? because the consumed offset
>>> > is not committed back to kafka?
>>> 
>>> This option is for the Kafka consumer. AFAIK this is just a convenience
>>> method to commit the latest checkpointed offset to Kafka. This offset is
>>> not used when restoring from a checkpoint. However, if you don't restore
>>> from a checkpoint, you can resume from that offset which might be
>>> convenient or not, depending on your use case.
>>> 
>>> 
>> 
> 


Re: KafkaIO Exactly once vs At least Once

Posted by Eleanore Jin <el...@gmail.com>.
Hi Alexey,

Thanks a lot for the information! I will give it a try.

Regarding the checkpoint intervals, I think the Flink community suggested
something between 3-5 minutes, I am not sure yet if the checkpoint interval
can be in milliseconds? Currently, our beam pipeline is stateless, there is
no other operator state or user defined state.

Thanks a lot!
Eleanore

On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <ar...@gmail.com>
wrote:

>
> On 23 Jun 2020, at 07:49, Eleanore Jin <el...@gmail.com> wrote:
>
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
> "mySlinkGroupId"), reading from your response, do I need additionally
> configure KafkaProducer property enable.idempotence=true, or I only need to
> configure this property?
>
>
> No, you don’t need to do that. New KafkaProducer will be created with this
> option set in KafkaExactlyOnceSink [1].
>
> So can you please correct me if the above settings is not the optimal and
> if there is anyway to reduce the latency by introducing checkpointing for
> EOS?
>
>
> Your settings look fine for me. You probably could play with checkpoint
> intervals (why it’s 10 secs?) to reduce a latency.
>
>
> [1]
> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>
>
>
>
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> I think you don’t need to enable EOS in this case since KafkaIO has a
>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>> all runners) and it relies on setting “enable.idempotence=true” for
>> KafkaProducer.
>> I’m not sure that you can achieve “at least once” semantics with current
>> KafkaIO implementation.
>>
>> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
>>
>> Hi All,
>>
>> I previously asked a few questions regarding enable EOS (exactly once
>> semantics) please see below.
>>
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>> KafkaIO to publish to sink topic.
>>
>> According to Max's answer to my previous questions, enable EOS with
>> KafkaIO will introduce latency,
>> as only after checkpoints of all message within the checkpoint interval,
>> then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the
>> checkpoint interval.
>>
>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>> on KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>>
>> Thanks a lot!
>> Eleanore
>>
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>>
>> That's correct.
>>
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>>
>> Yes, the buffered elements will be checkpointed.
>>
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>>
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>>
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>>
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>>
>>
>>
>

Re: KafkaIO Exactly once vs At least Once

Posted by Alexey Romanenko <ar...@gmail.com>.
> On 23 Jun 2020, at 07:49, Eleanore Jin <el...@gmail.com> wrote:
> 
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic, "mySlinkGroupId"), reading from your response, do I need additionally configure KafkaProducer property enable.idempotence=true, or I only need to configure this property?

No, you don’t need to do that. New KafkaProducer will be created with this option set in KafkaExactlyOnceSink [1].

> So can you please correct me if the above settings is not the optimal and if there is anyway to reduce the latency by introducing checkpointing for EOS?

Your settings look fine for me. You probably could play with checkpoint intervals (why it’s 10 secs?) to reduce a latency.


[1] https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711


> 
> 
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> I think you don’t need to enable EOS in this case since KafkaIO has a dedicated EOS-sink implementation for Beam part (btw, it’s not supported by all runners) and it relies on setting “enable.idempotence=true” for KafkaProducer.
> I’m not sure that you can achieve “at least once” semantics with current KafkaIO implementation.
> 
>> On 16 Jun 2020, at 17:16, Eleanore Jin <eleanore.jin@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi All, 
>> 
>> I previously asked a few questions regarding enable EOS (exactly once semantics) please see below.
>> 
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use KafkaIO to publish to sink topic.
>> 
>> According to Max's answer to my previous questions, enable EOS with KafkaIO will introduce latency, 
>> as only after checkpoints of all message within the checkpoint interval, then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the checkpoint interval.
>> 
>> I just wonder if I relax to At Least Once, do I still need to enable EOS on KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>> 
>> Thanks a lot!
>> Eleanore
>> 
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR <https://github.com/apache/beam/pull/7991/files <https://github.com/apache/beam/pull/7991/files>>, can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>> 
>> That's correct.
>> 
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>> 
>> Yes, the buffered elements will be checkpointed.
>> 
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>> 
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>> 
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>> 
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>> 
>> 
> 


Re: KafkaIO Exactly once vs At least Once

Posted by Eleanore Jin <el...@gmail.com>.
Hi Alexey,

Thanks for the response, below are some of my follow up question:

the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way
I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
"mySlinkGroupId"), reading from your response, do I need additionally
configure KafkaProducer property enable.idempotence=true, or I only need to
configure this property?

The configuration I currently tried:
- from FlinkRunnerOptions:

   1. enable checkpoints: options.setCheckpointingInterval(10_000L);
   2. set state backend to Filesystem:
   options.setStateBackendFactory(FsBackendFactory. class);
   3. optionally set number of retries when pipeline fails before exit the
   application: options.setNumberOfExecutionRetries(2);


- from KafkaIO.read():

   1. set enable.auto.commit to false for Kafka ConsumerConfig
   2. set Kafka ConsumerConfig isolation.level to read_committed via Beam
   KafkaIO.Read: withReadCommitted()
   3. set to commit finalized offset to Kafka, called when finalize
   checkpoint: commitOffsetsInFinalize()

- from KafkaIO.write(), I only configured:
withEOS(numPartitionsForSinkTopic, "mySlinkGroupId")

With the above settings,  the current observation is: when I inject some
artificial Exception, the beam job is triggered to restart and there is no
message loss, but the message will not show up in the output topic until
the checkpoint finishes, so the latency depends on the checkpoint interval.

So can you please correct me if the above settings is not the optimal and
if there is anyway to reduce the latency by introducing checkpointing for
EOS?

Thanks a lot!
Eleanore


On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> I think you don’t need to enable EOS in this case since KafkaIO has a
> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
> all runners) and it relies on setting “enable.idempotence=true” for
> KafkaProducer.
> I’m not sure that you can achieve “at least once” semantics with current
> KafkaIO implementation.
>
> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
>
> Hi All,
>
> I previously asked a few questions regarding enable EOS (exactly once
> semantics) please see below.
>
> Our Beam pipeline uses KafkaIO to read from source topic, and then use
> KafkaIO to publish to sink topic.
>
> According to Max's answer to my previous questions, enable EOS with
> KafkaIO will introduce latency,
> as only after checkpoints of all message within the checkpoint interval,
> then the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the
> checkpoint interval.
>
> I just wonder if I relax to At Least Once, do I still need to enable EOS
> on KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
>
> Thanks a lot!
> Eleanore
>
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
>
>

Re: KafkaIO Exactly once vs At least Once

Posted by Alexey Romanenko <ar...@gmail.com>.
I think you don’t need to enable EOS in this case since KafkaIO has a dedicated EOS-sink implementation for Beam part (btw, it’s not supported by all runners) and it relies on setting “enable.idempotence=true” for KafkaProducer.
I’m not sure that you can achieve “at least once” semantics with current KafkaIO implementation.

> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
> 
> Hi All, 
> 
> I previously asked a few questions regarding enable EOS (exactly once semantics) please see below.
> 
> Our Beam pipeline uses KafkaIO to read from source topic, and then use KafkaIO to publish to sink topic.
> 
> According to Max's answer to my previous questions, enable EOS with KafkaIO will introduce latency, 
> as only after checkpoints of all message within the checkpoint interval, then the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the checkpoint interval.
> 
> I just wonder if I relax to At Least Once, do I still need to enable EOS on KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
> 
> Thanks a lot!
> Eleanore
> 
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files <https://github.com/apache/beam/pull/7991/files>>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
> 
> That's correct.
> 
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
> 
> Yes, the buffered elements will be checkpointed.
> 
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
> 
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
> 
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
> 
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
> 
>