You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yomal de Silva <yo...@gmail.com> on 2022/09/25 11:55:57 UTC

[Question] Using KafkaIO without a data loss

Hi all,

I have started using KafkaIO to read a data stream and have the following
questions. Appreciate it if you could provide a few clarifications on the
following.

1. Does KafkaIO ignore the offset stored in the broker and uses the offset
stored during checkpointing when consuming messages?
2. How many threads will be used by the Kafka consumer?
3. If the consumer polls a set of messages A and then later B while A is
still being processed, is there a possibility of set B finishing before A?
Does parallelism control this?
4. In the above scenario if B is committed back to the broker and somehow A
failed, upon a restart is there any way we can consume A again without
losing data?

Thank you.

Re: [Question] Using KafkaIO without a data loss

Posted by Yomal de Silva <yo...@gmail.com>.
But still, if we have a new deployment rolled out in which we can't recover
the state from the previous snapshot/savepoint there is a possibility of a
data loss here right? This is considering if we modify the existing
operators or add/delete the operators in such a way that the operator
states cannot get recovered from the snapshot. I think this is a very valid
scenario when rolling out new features into the pipeline.

Any thoughts on this? In such a case what would be the best practice to
recover those records?

On Mon, Sep 26, 2022 at 12:55 AM Reuven Lax <re...@google.com> wrote:

> If you are using an exactly-once runner, it will guarantee every message
> is consumed once (though the mechanism might not be obvious).
>
> Generally what happens is that the messages are consumed into the system
> in order. However if you have downstream ParDos, there is no guarantee that
> they process the messages in the same order (especially if there is a
> shuffle operation, such as GroupByKey, in between).
>
> Now a future version of the source might decide to split the Kafka
> partition if it's too large to handle on one thread (e.g. split it in half
> where the first half is bounded and the second half is the growing
> unbounded tail of the partition). In this case the source would keep two
> checkpoints for the current position in each half of the partition. (this
> mode of operation probably wouldn't be compatible with checkpointing
> offsets back to the broker though.). The source doesn't do this today, I'm
> just mentioning it to point out another way in which things could be
> consumed out of order.
>
> On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva <yo...@gmail.com>
> wrote:
>
>> Hi Reuven,
>> Thanks for those clarifications.
>>
>> For the 4th question that I raised, if A gets failed and B is committed,
>> will those messages(A) get consumed again from Kafka or will the messages
>> get recovered from the checkpoint and retried in that specific operator?
>>
>> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yo...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have started using KafkaIO to read a data stream and have the
>>>> following questions. Appreciate it if you could provide a few
>>>> clarifications on the following.
>>>>
>>>>
>>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>>> offset stored during checkpointing when consuming messages?
>>>>
>>>
>>> Generally yes, as that's the only way to guarantee consistency (we can't
>>> atomically commit to the runner and to Kafka). However when starting a new
>>> pipeline, you should be able to start reading at the broker checkpoint.
>>>
>>>
>>>> 2. How many threads will be used by the Kafka consumer?
>>>>
>>>
>>> This depends somewhat on the runner, but you can expect one thread per
>>> partition.
>>>
>>>
>>>> 3. If the consumer polls a set of messages A and then later B while A
>>>> is still being processed, is there a possibility of set B finishing before
>>>> A? Does parallelism control this?
>>>>
>>>
>>> yes. Beam doesn't currently have any notion of ordering. All messages
>>> are independent and can be processed at different times (the source also
>>> reserves the right to process different ranges of a single Kafka partition
>>> on different threads, though it doesn't currently do this).
>>>
>>>
>>>> 4. In the above scenario if B is committed back to the broker and
>>>> somehow A failed, upon a restart is there any way we can consume A again
>>>> without losing data?
>>>>
>>>
>>> Data should never be lost. If B is processed, then you can assume that
>>> the A data is checkpointed inside the Beam runner and will be processed to.
>>>
>>>
>>>
>>>>
>>>> Thank you.
>>>>
>>>>
>>>>
>>>

Re: [Question] Using KafkaIO without a data loss

Posted by Reuven Lax via user <us...@beam.apache.org>.
If you are using an exactly-once runner, it will guarantee every message is
consumed once (though the mechanism might not be obvious).

Generally what happens is that the messages are consumed into the system in
order. However if you have downstream ParDos, there is no guarantee that
they process the messages in the same order (especially if there is a
shuffle operation, such as GroupByKey, in between).

Now a future version of the source might decide to split the Kafka
partition if it's too large to handle on one thread (e.g. split it in half
where the first half is bounded and the second half is the growing
unbounded tail of the partition). In this case the source would keep two
checkpoints for the current position in each half of the partition. (this
mode of operation probably wouldn't be compatible with checkpointing
offsets back to the broker though.). The source doesn't do this today, I'm
just mentioning it to point out another way in which things could be
consumed out of order.

On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva <yo...@gmail.com>
wrote:

> Hi Reuven,
> Thanks for those clarifications.
>
> For the 4th question that I raised, if A gets failed and B is committed,
> will those messages(A) get consumed again from Kafka or will the messages
> get recovered from the checkpoint and retried in that specific operator?
>
> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>>
>>
>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yo...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have started using KafkaIO to read a data stream and have the
>>> following questions. Appreciate it if you could provide a few
>>> clarifications on the following.
>>>
>>>
>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>> offset stored during checkpointing when consuming messages?
>>>
>>
>> Generally yes, as that's the only way to guarantee consistency (we can't
>> atomically commit to the runner and to Kafka). However when starting a new
>> pipeline, you should be able to start reading at the broker checkpoint.
>>
>>
>>> 2. How many threads will be used by the Kafka consumer?
>>>
>>
>> This depends somewhat on the runner, but you can expect one thread per
>> partition.
>>
>>
>>> 3. If the consumer polls a set of messages A and then later B while A is
>>> still being processed, is there a possibility of set B finishing before A?
>>> Does parallelism control this?
>>>
>>
>> yes. Beam doesn't currently have any notion of ordering. All messages are
>> independent and can be processed at different times (the source also
>> reserves the right to process different ranges of a single Kafka partition
>> on different threads, though it doesn't currently do this).
>>
>>
>>> 4. In the above scenario if B is committed back to the broker and
>>> somehow A failed, upon a restart is there any way we can consume A again
>>> without losing data?
>>>
>>
>> Data should never be lost. If B is processed, then you can assume that
>> the A data is checkpointed inside the Beam runner and will be processed to.
>>
>>
>>
>>>
>>> Thank you.
>>>
>>>
>>>
>>

Re: [Question] Using KafkaIO without a data loss

Posted by Yomal de Silva <yo...@gmail.com>.
Hi Reuven,
Thanks for those clarifications.

For the 4th question that I raised, if A gets failed and B is committed,
will those messages(A) get consumed again from Kafka or will the messages
get recovered from the checkpoint and retried in that specific operator?

On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user <us...@beam.apache.org>
wrote:

>
>
> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yo...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have started using KafkaIO to read a data stream and have the following
>> questions. Appreciate it if you could provide a few clarifications on the
>> following.
>>
>>
> 1. Does KafkaIO ignore the offset stored in the broker and uses the offset
>> stored during checkpointing when consuming messages?
>>
>
> Generally yes, as that's the only way to guarantee consistency (we can't
> atomically commit to the runner and to Kafka). However when starting a new
> pipeline, you should be able to start reading at the broker checkpoint.
>
>
>> 2. How many threads will be used by the Kafka consumer?
>>
>
> This depends somewhat on the runner, but you can expect one thread per
> partition.
>
>
>> 3. If the consumer polls a set of messages A and then later B while A is
>> still being processed, is there a possibility of set B finishing before A?
>> Does parallelism control this?
>>
>
> yes. Beam doesn't currently have any notion of ordering. All messages are
> independent and can be processed at different times (the source also
> reserves the right to process different ranges of a single Kafka partition
> on different threads, though it doesn't currently do this).
>
>
>> 4. In the above scenario if B is committed back to the broker and somehow
>> A failed, upon a restart is there any way we can consume A again without
>> losing data?
>>
>
> Data should never be lost. If B is processed, then you can assume that the
> A data is checkpointed inside the Beam runner and will be processed to.
>
>
>
>>
>> Thank you.
>>
>>
>>
>

Re: [Question] Using KafkaIO without a data loss

Posted by Reuven Lax via user <us...@beam.apache.org>.
On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yo...@gmail.com>
wrote:

> Hi all,
>
> I have started using KafkaIO to read a data stream and have the following
> questions. Appreciate it if you could provide a few clarifications on the
> following.
>
>
1. Does KafkaIO ignore the offset stored in the broker and uses the offset
> stored during checkpointing when consuming messages?
>

Generally yes, as that's the only way to guarantee consistency (we can't
atomically commit to the runner and to Kafka). However when starting a new
pipeline, you should be able to start reading at the broker checkpoint.


> 2. How many threads will be used by the Kafka consumer?
>

This depends somewhat on the runner, but you can expect one thread per
partition.


> 3. If the consumer polls a set of messages A and then later B while A is
> still being processed, is there a possibility of set B finishing before A?
> Does parallelism control this?
>

yes. Beam doesn't currently have any notion of ordering. All messages are
independent and can be processed at different times (the source also
reserves the right to process different ranges of a single Kafka partition
on different threads, though it doesn't currently do this).


> 4. In the above scenario if B is committed back to the broker and somehow
> A failed, upon a restart is there any way we can consume A again without
> losing data?
>

Data should never be lost. If B is processed, then you can assume that the
A data is checkpointed inside the Beam runner and will be processed to.



>
> Thank you.
>
>
>