You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eleanore Jin <el...@gmail.com> on 2020/07/31 21:16:52 UTC

Re: KafkaIO Exactly once vs At least Once

Hi Alex,

You previously suggested for KafkaIO exactly once, I do not need to
configure KafkaIO.write().withEOS(), only need to add producer property
enable.idempotence=true, however from the Code:
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1311,
only withEOS(), then it will use KafkaExactlyOnceSink, otherwise it will
use KafkaWriter.

[image: image.png]

Did I misunderstand something? Or does it require EOS() for end-to-end
exactly once with KafkaIO as source and sink?

Thanks a lot!
Eleanore

On Wed, Jun 24, 2020 at 12:48 PM Eleanore Jin <el...@gmail.com>
wrote:

>
> Hi Alex,
>
> Thanks a lot for the info.
>
> Eleanore
>
> On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Well, I think, in general, it will be a question of trade-off between
>> latency and performance in case of EOS sink (since EOS can’t be "for
>> free").
>>
>> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or
>> others with more Flink knowledge can do), but I’d just try different
>> numbers to see how it it will affect the results.
>>
>> On 23 Jun 2020, at 23:27, Eleanore Jin <el...@gmail.com> wrote:
>>
>> Hi Alexey,
>>
>> Thanks a lot for the information! I will give it a try.
>>
>> Regarding the checkpoint intervals, I think the Flink community suggested
>> something between 3-5 minutes, I am not sure yet if the checkpoint interval
>> can be in milliseconds? Currently, our beam pipeline is stateless, there is
>> no other operator state or user defined state.
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>>
>>> On 23 Jun 2020, at 07:49, Eleanore Jin <el...@gmail.com> wrote:
>>>
>>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
>>> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
>>> "mySlinkGroupId"), reading from your response, do I need additionally
>>> configure KafkaProducer property enable.idempotence=true, or I only need to
>>> configure this property?
>>>
>>>
>>> No, you don’t need to do that. New KafkaProducer will be created with
>>> this option set in KafkaExactlyOnceSink [1].
>>>
>>> So can you please correct me if the above settings is not the optimal
>>> and if there is anyway to reduce the latency by introducing checkpointing
>>> for EOS?
>>>
>>>
>>> Your settings look fine for me. You probably could play with checkpoint
>>> intervals (why it’s 10 secs?) to reduce a latency.
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>>
>>>
>>>
>>>
>>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> I think you don’t need to enable EOS in this case since KafkaIO has a
>>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>>>> all runners) and it relies on setting “enable.idempotence=true” for
>>>> KafkaProducer.
>>>> I’m not sure that you can achieve “at least once” semantics with
>>>> current KafkaIO implementation.
>>>>
>>>> On 16 Jun 2020, at 17:16, Eleanore Jin <el...@gmail.com> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I previously asked a few questions regarding enable EOS (exactly once
>>>> semantics) please see below.
>>>>
>>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>>>> KafkaIO to publish to sink topic.
>>>>
>>>> According to Max's answer to my previous questions, enable EOS with
>>>> KafkaIO will introduce latency,
>>>> as only after checkpoints of all message within the checkpoint
>>>> interval, then the KakfaIO.ExactlyOnceWriter
>>>> processElement method will be called. So the latency depends on the
>>>> checkpoint interval.
>>>>
>>>> I just wonder if I relax to At Least Once, do I still need to enable
>>>> EOS on KafkaIO? Or it is not required?
>>>> If not, can you please provide some instruction how should it be done?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> > Thanks for the response! the reason to setup the state backend is to
>>>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>>>> > code and this PR <https://github.com/apache/beam/pull/7991/files>,
>>>> can
>>>> > you please help me clarify my understanding?
>>>> >
>>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>>>> > EOS, ExactlyOnceWriter processElement method is annotated
>>>> > with @RequiresStableInput, so all the messages will be cached
>>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds,
>>>> those
>>>> > messages will be processed by ExactlyOnceWriter?
>>>>
>>>> That's correct.
>>>>
>>>> >
>>>> > 2. Upon checkpoint, will those messages cached by
>>>> > KeyedBufferingEleementsHandler also checkpointed?
>>>>
>>>> Yes, the buffered elements will be checkpointed.
>>>>
>>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in
>>>> the
>>>> > stream processing, the delay is based on the checkpoint interval? How
>>>> to
>>>> > reduce the latency while still have EOS guarantee?
>>>>
>>>> Indeed, the checkpoint interval and the checkpoint duration limits the
>>>> latency. Given the current design and the guarantees, there is no other
>>>> way to influence the latency.
>>>>
>>>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>>>> > checkpoint successfully, the checkpointed offset will be committed
>>>> back
>>>> > to kafka, but if this operation does not finish successfully, and then
>>>> > the job gets cancelled/stopped, and re-submit the job again (with the
>>>> > same consumer group for source topics, but different jobID), then it
>>>> is
>>>> > possible duplicated processing still exists? because the consumed
>>>> offset
>>>> > is not committed back to kafka?
>>>>
>>>> This option is for the Kafka consumer. AFAIK this is just a convenience
>>>> method to commit the latest checkpointed offset to Kafka. This offset is
>>>> not used when restoring from a checkpoint. However, if you don't restore
>>>> from a checkpoint, you can resume from that offset which might be
>>>> convenient or not, depending on your use case.
>>>>
>>>>
>>>>
>>>
>>