You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Deepak Nagaraj <de...@primer.ai> on 2022/06/04 00:46:09 UTC

possible data loss with Kafka I/O

Hi Beam team,

We have seen data loss with Beam pipelines under the following
condition. i.e., Beam thinks it has processed data when in reality it
has not:

  * Kafka consumer config: enable.auto.commit set to true (default),
auto.offset.reset set to latest (default)
  * ReadFromKafka(): commit_offset_in_finalize set to false (default)
  * No successful checkpoints (i.e. every checkpoint times out)

Under this condition, if we post to Kafka, the pipeline starts up and
reads Kafka messages, and the offsets are auto-committed after 5
seconds (Kafka consumer default).

This is usually not a problem, because Beam saves offsets to
checkpoints and uses the offsets in checkpoints upon pipeline restart.

But if the checkpointing never succeeds (e.g. we hit this with a slow
processor pipeline), or if there are no previous checkpoints, then
upon restart Beam starts with the latest Kafka offset. This is a data
loss situation.

We can prevent this problem by setting by default:
* ReadFromKafka(): commit_offset_in_finalize to true
* Kafka consumer config: enable.auto.commit to false

If the checkpointing is problematic, this can cause continuous
reprocessing, but it's still better than data loss.

What do you think of this?

Regards,
Deepak

Re: possible data loss with Kafka I/O

Posted by Deepak Nagaraj <de...@primer.ai>.
Hi Cham,

Thank you for your response. One thing I didn't mention earlier is:
all of this is with Beam's Flink runner.

On Sat, Jun 4, 2022 at 9:55 AM Chamikara Jayalath <ch...@google.com> wrote:
>
>>
>>   * Kafka consumer config: enable.auto.commit set to true (default),
>> auto.offset.reset set to latest (default)
>
> To clarify, these are defaults for Kafka, not for Beam, right ?
>

You are right, these are the defaults for Kafka consumer.  They can be
overridden within Beam's ReadFromKafka(), via the parameter
consumer_config.

>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>   * No successful checkpoints (i.e. every checkpoint times out)
>>
>>
>> Under this condition, if we post to Kafka, the pipeline starts up and
>> reads Kafka messages, and the offsets are auto-committed after 5
>> seconds (Kafka consumer default).
>
>
> I think this is again the default behavior of Kafka, right ? So once messages are read (by Beam or otherwise) and "enable.auto.commit" is set. Messages are auto-committed ? I don't think Beam would have control over this so possibly the pipeline user should set the Kafka broker settings to not auto-commit if the runner does not preserve messages that are read.
>

Here it's a bit of grey. ReadFromKafka() is provided by Beam, and
commit_offset_in_finalize is a parameter for this function.

Also, enabling checkpoints is part of Beam's Flink runner.

>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>> processor pipeline), or if there are no previous checkpoints, then
>> upon restart Beam starts with the latest Kafka offset. This is a data
>> loss situation.
>
> Could you clarify what you mean by pipeline restart here ?  For example, with Dataflow would ensure a successful checkpoint if a pipeline is manually drained.
>

Sure -- upon a checkpoint failure, the default behavior is for the
pipeline job to be marked failed and restarted (in Flink). When the
job restarts, because there have not been any successful checkpoints,
Beam starts reading from the latest offset.  This causes data loss.

It seems to me that the system should at least warn the user to set
the relevant config bits correctly when checkpointing is enabled (I
know for sure about Flink, not sure about Dataflow):

If checkpointing is enabled, when ReadFromKafka() is invoked, warn
about possible data loss unless both of these are true:

a) commit_offset_in_finalize is true in ReadFromKafka(),
b) enable.auto.commit is false in Kafka consumer config

We could also choose not to make any code changes and instead document
it within Flink runner's checkpointing_interval parameter. However,
that creates a bit of distance because this is relevant within
ReadFromKafka().

Let me know if I can clarify anything else.

Thanks again,
Deepak

Re: possible data loss with Kafka I/O

Posted by Deepak Nagaraj <de...@primer.ai>.
On Tue, Jun 7, 2022 at 11:21 AM Cristian Constantinescu
<ze...@gmail.com> wrote:
>
> Hey Deepak,
>
> I have observed this too. See point "a" in "Other quirks I found:" in this thread [1].
>
> [1] https://lists.apache.org/thread/ksd4nfjmzmp97hs2zgn2mfpf8fsy0myw
>

Yes! This is exactly what we saw as well. Thanks, Cristian, for the
detailed notes.

Cham, I have opened ticket 21742: https://github.com/apache/beam/issues/21742

Regards,
Deepak

Re: possible data loss with Kafka I/O

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hey Deepak,

I have observed this too. See point "a" in "Other quirks I found:" in this
thread [1].

[1] https://lists.apache.org/thread/ksd4nfjmzmp97hs2zgn2mfpf8fsy0myw

On Tue, Jun 7, 2022 at 2:13 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Tue, Jun 7, 2022 at 11:06 AM Deepak Nagaraj <de...@primer.ai>
> wrote:
>
>> Hi Cham,
>>
>> On Mon, Jun 6, 2022 at 7:18 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Jun 6, 2022 at 1:08 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Jun 6, 2022 at 10:22 AM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> BTW I think we have already document this behavior of KafkaIO here:
>>>>> https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274
>>>>>
>>>>> After the first checkpoint,  the readers should start reading from the
>>>>> last saved offsets, so it seems like the behavior you are observing is
>>>>> primarily regarding picking the starting offset of the reader. I'm not sure
>>>>> if we need to do changes to make this more safer for Flink pipeline
>>>>> restarts.
>>>>>
>>>>
>>>> Would it make sense to make this a warning? It is easy to miss this
>>>> note in the javadocs.
>>>>
>>>
>>> Makes sense to me. Deepak, will you be able to send a PR for this and/or
>>> create a Jira ?
>>>
>>>
>> Thanks -- sure, did you mean Github Issues? I've seen my past tickets get
>> migrated to Github now.
>>
>
> Yeah, a Github issue. Thanks.
>
>
>>
>> Deepak
>>
>>

Re: possible data loss with Kafka I/O

Posted by Chamikara Jayalath <ch...@google.com>.
On Tue, Jun 7, 2022 at 11:06 AM Deepak Nagaraj <de...@primer.ai>
wrote:

> Hi Cham,
>
> On Mon, Jun 6, 2022 at 7:18 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Mon, Jun 6, 2022 at 1:08 PM Ahmet Altay <al...@google.com> wrote:
>>
>>>
>>>
>>> On Mon, Jun 6, 2022 at 10:22 AM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> BTW I think we have already document this behavior of KafkaIO here:
>>>> https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274
>>>>
>>>> After the first checkpoint,  the readers should start reading from the
>>>> last saved offsets, so it seems like the behavior you are observing is
>>>> primarily regarding picking the starting offset of the reader. I'm not sure
>>>> if we need to do changes to make this more safer for Flink pipeline
>>>> restarts.
>>>>
>>>
>>> Would it make sense to make this a warning? It is easy to miss this note
>>> in the javadocs.
>>>
>>
>> Makes sense to me. Deepak, will you be able to send a PR for this and/or
>> create a Jira ?
>>
>>
> Thanks -- sure, did you mean Github Issues? I've seen my past tickets get
> migrated to Github now.
>

Yeah, a Github issue. Thanks.


>
> Deepak
>
>

Re: possible data loss with Kafka I/O

Posted by Deepak Nagaraj <de...@primer.ai>.
Hi Cham,

On Mon, Jun 6, 2022 at 7:18 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Mon, Jun 6, 2022 at 1:08 PM Ahmet Altay <al...@google.com> wrote:
>
>>
>>
>> On Mon, Jun 6, 2022 at 10:22 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> BTW I think we have already document this behavior of KafkaIO here:
>>> https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274
>>>
>>> After the first checkpoint,  the readers should start reading from the
>>> last saved offsets, so it seems like the behavior you are observing is
>>> primarily regarding picking the starting offset of the reader. I'm not sure
>>> if we need to do changes to make this more safer for Flink pipeline
>>> restarts.
>>>
>>
>> Would it make sense to make this a warning? It is easy to miss this note
>> in the javadocs.
>>
>
> Makes sense to me. Deepak, will you be able to send a PR for this and/or
> create a Jira ?
>
>
Thanks -- sure, did you mean Github Issues? I've seen my past tickets get
migrated to Github now.

Deepak

Re: possible data loss with Kafka I/O

Posted by Chamikara Jayalath <ch...@google.com>.
On Mon, Jun 6, 2022 at 1:08 PM Ahmet Altay <al...@google.com> wrote:

>
>
> On Mon, Jun 6, 2022 at 10:22 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> BTW I think we have already document this behavior of KafkaIO here:
>> https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274
>>
>> After the first checkpoint,  the readers should start reading from the
>> last saved offsets, so it seems like the behavior you are observing is
>> primarily regarding picking the starting offset of the reader. I'm not sure
>> if we need to do changes to make this more safer for Flink pipeline
>> restarts.
>>
>
> Would it make sense to make this a warning? It is easy to miss this note
> in the javadocs.
>

Makes sense to me. Deepak, will you be able to send a PR for this and/or
create a Jira ?

Thanks,
Cham


>
>>
>> Thanks,
>> Cham
>>
>> On Sat, Jun 4, 2022 at 8:11 PM Deepak Nagaraj <de...@primer.ai>
>> wrote:
>>
>>> On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Cham do you know if the Flunk runner uses the sdf version or the old
>>>>> version?
>>>>>
>>>>
>>>> I think that depends on whether the experiment "use_deprecated_read"
>>>> was specified or not. If that was specified, Flink will use the old version
>>>> (which will also be overridden by a native Flink Kafka source
>>>> implementation IIRC).
>>>>
>>>
>>> Yes, we have this experiment specified. Thanks,
>>> Deepak
>>>
>>>
>>>>
>>>>>
>>>>> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi Deepak,
>>>>>>
>>>>>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <
>>>>>> deepak.nagaraj@primer.ai> wrote:
>>>>>>
>>>>>>> Hi Beam team,
>>>>>>>
>>>>>>> We have seen data loss with Beam pipelines under the following
>>>>>>> condition. i.e., Beam thinks it has processed data when in reality it
>>>>>>> has not:
>>>>>>>
>>>>>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>>>>>> auto.offset.reset set to latest (default)
>>>>>>>
>>>>>>
>>>>>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>>>>>
>>>>>>
>>>>>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>>>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>>>>>
>>>>>>
>>>>>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>>>>>> reads Kafka messages, and the offsets are auto-committed after 5
>>>>>>> seconds (Kafka consumer default).
>>>>>>>
>>>>>>
>>>>>> I think this is again the default behavior of Kafka, right ? So once
>>>>>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>>>>>> Messages are auto-committed ? I don't think Beam would have control over
>>>>>> this so possibly the pipeline user should set the Kafka broker settings to
>>>>>> not auto-commit if the runner does not preserve messages that are read.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> This is usually not a problem, because Beam saves offsets to
>>>>>>> checkpoints and uses the offsets in checkpoints upon pipeline
>>>>>>> restart.
>>>>>>>
>>>>>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>>>>>> processor pipeline), or if there are no previous checkpoints, then
>>>>>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>>>>>> loss situation.
>>>>>>>
>>>>>>
>>>>>> Could you clarify what you mean by pipeline restart here ?  For
>>>>>> example, with Dataflow would ensure a successful checkpoint if a pipeline
>>>>>> is manually drained.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>> We can prevent this problem by setting by default:
>>>>>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>>>>>> * Kafka consumer config: enable.auto.commit to false
>>>>>>
>>>>>>
>>>>>>> If the checkpointing is problematic, this can cause continuous
>>>>>>> reprocessing, but it's still better than data loss.
>>>>>>>
>>>>>>> What do you think of this?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Deepak
>>>>>>>
>>>>>> --
>>> Deepak Nagaraj  |  Machine Learning Engineer
>>>
>>>
>>> We create the tools behind the decisions that change the world.
>>> We're hiring! <https://boards.greenhouse.io/primerai>  |  primer.ai  |
>>> blog <https://primer.ai/blog/>  |  twitter
>>> <https://twitter.com/primer_ai>
>>>
>>

Re: possible data loss with Kafka I/O

Posted by Ahmet Altay <al...@google.com>.
On Mon, Jun 6, 2022 at 10:22 AM Chamikara Jayalath <ch...@google.com>
wrote:

> BTW I think we have already document this behavior of KafkaIO here:
> https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274
>
> After the first checkpoint,  the readers should start reading from the
> last saved offsets, so it seems like the behavior you are observing is
> primarily regarding picking the starting offset of the reader. I'm not sure
> if we need to do changes to make this more safer for Flink pipeline
> restarts.
>

Would it make sense to make this a warning? It is easy to miss this note in
the javadocs.


>
> Thanks,
> Cham
>
> On Sat, Jun 4, 2022 at 8:11 PM Deepak Nagaraj <de...@primer.ai>
> wrote:
>
>> On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Cham do you know if the Flunk runner uses the sdf version or the old
>>>> version?
>>>>
>>>
>>> I think that depends on whether the experiment "use_deprecated_read" was
>>> specified or not. If that was specified, Flink will use the old version
>>> (which will also be overridden by a native Flink Kafka source
>>> implementation IIRC).
>>>
>>
>> Yes, we have this experiment specified. Thanks,
>> Deepak
>>
>>
>>>
>>>>
>>>> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi Deepak,
>>>>>
>>>>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <
>>>>> deepak.nagaraj@primer.ai> wrote:
>>>>>
>>>>>> Hi Beam team,
>>>>>>
>>>>>> We have seen data loss with Beam pipelines under the following
>>>>>> condition. i.e., Beam thinks it has processed data when in reality it
>>>>>> has not:
>>>>>>
>>>>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>>>>> auto.offset.reset set to latest (default)
>>>>>>
>>>>>
>>>>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>>>>
>>>>>
>>>>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>>>>
>>>>>
>>>>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>>>>> reads Kafka messages, and the offsets are auto-committed after 5
>>>>>> seconds (Kafka consumer default).
>>>>>>
>>>>>
>>>>> I think this is again the default behavior of Kafka, right ? So once
>>>>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>>>>> Messages are auto-committed ? I don't think Beam would have control over
>>>>> this so possibly the pipeline user should set the Kafka broker settings to
>>>>> not auto-commit if the runner does not preserve messages that are read.
>>>>>
>>>>>
>>>>>>
>>>>>> This is usually not a problem, because Beam saves offsets to
>>>>>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>>>>>
>>>>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>>>>> processor pipeline), or if there are no previous checkpoints, then
>>>>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>>>>> loss situation.
>>>>>>
>>>>>
>>>>> Could you clarify what you mean by pipeline restart here ?  For
>>>>> example, with Dataflow would ensure a successful checkpoint if a pipeline
>>>>> is manually drained.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>> We can prevent this problem by setting by default:
>>>>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>>>>> * Kafka consumer config: enable.auto.commit to false
>>>>>
>>>>>
>>>>>> If the checkpointing is problematic, this can cause continuous
>>>>>> reprocessing, but it's still better than data loss.
>>>>>>
>>>>>> What do you think of this?
>>>>>>
>>>>>> Regards,
>>>>>> Deepak
>>>>>>
>>>>> --
>> Deepak Nagaraj  |  Machine Learning Engineer
>>
>>
>> We create the tools behind the decisions that change the world.
>> We're hiring! <https://boards.greenhouse.io/primerai>  |  primer.ai  |
>> blog <https://primer.ai/blog/>  |  twitter
>> <https://twitter.com/primer_ai>
>>
>

Re: possible data loss with Kafka I/O

Posted by Chamikara Jayalath <ch...@google.com>.
BTW I think we have already document this behavior of KafkaIO here:
https://github.com/apache/beam/blob/4b623313707df8a3c3846412f54edf2e3c947374/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L274

After the first checkpoint,  the readers should start reading from the last
saved offsets, so it seems like the behavior you are observing is primarily
regarding picking the starting offset of the reader. I'm not sure if we
need to do changes to make this more safer for Flink pipeline restarts.

Thanks,
Cham

On Sat, Jun 4, 2022 at 8:11 PM Deepak Nagaraj <de...@primer.ai>
wrote:

> On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Cham do you know if the Flunk runner uses the sdf version or the old
>>> version?
>>>
>>
>> I think that depends on whether the experiment "use_deprecated_read" was
>> specified or not. If that was specified, Flink will use the old version
>> (which will also be overridden by a native Flink Kafka source
>> implementation IIRC).
>>
>
> Yes, we have this experiment specified. Thanks,
> Deepak
>
>
>>
>>>
>>> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>>
>>>> Hi Deepak,
>>>>
>>>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <de...@primer.ai>
>>>> wrote:
>>>>
>>>>> Hi Beam team,
>>>>>
>>>>> We have seen data loss with Beam pipelines under the following
>>>>> condition. i.e., Beam thinks it has processed data when in reality it
>>>>> has not:
>>>>>
>>>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>>>> auto.offset.reset set to latest (default)
>>>>>
>>>>
>>>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>>>
>>>>
>>>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>>>
>>>>
>>>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>>>> reads Kafka messages, and the offsets are auto-committed after 5
>>>>> seconds (Kafka consumer default).
>>>>>
>>>>
>>>> I think this is again the default behavior of Kafka, right ? So once
>>>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>>>> Messages are auto-committed ? I don't think Beam would have control over
>>>> this so possibly the pipeline user should set the Kafka broker settings to
>>>> not auto-commit if the runner does not preserve messages that are read.
>>>>
>>>>
>>>>>
>>>>> This is usually not a problem, because Beam saves offsets to
>>>>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>>>>
>>>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>>>> processor pipeline), or if there are no previous checkpoints, then
>>>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>>>> loss situation.
>>>>>
>>>>
>>>> Could you clarify what you mean by pipeline restart here ?  For
>>>> example, with Dataflow would ensure a successful checkpoint if a pipeline
>>>> is manually drained.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>> We can prevent this problem by setting by default:
>>>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>>>> * Kafka consumer config: enable.auto.commit to false
>>>>
>>>>
>>>>> If the checkpointing is problematic, this can cause continuous
>>>>> reprocessing, but it's still better than data loss.
>>>>>
>>>>> What do you think of this?
>>>>>
>>>>> Regards,
>>>>> Deepak
>>>>>
>>>> --
> Deepak Nagaraj  |  Machine Learning Engineer
>
>
> We create the tools behind the decisions that change the world.
> We're hiring! <https://boards.greenhouse.io/primerai>  |  primer.ai  |
> blog <https://primer.ai/blog/>  |  twitter <https://twitter.com/primer_ai>
>

Re: possible data loss with Kafka I/O

Posted by Deepak Nagaraj <de...@primer.ai>.
On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <ch...@google.com>
wrote:

> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:
>
>> Cham do you know if the Flunk runner uses the sdf version or the old
>> version?
>>
>
> I think that depends on whether the experiment "use_deprecated_read" was
> specified or not. If that was specified, Flink will use the old version
> (which will also be overridden by a native Flink Kafka source
> implementation IIRC).
>

Yes, we have this experiment specified. Thanks,
Deepak


>
>>
>> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>> Hi Deepak,
>>>
>>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <de...@primer.ai>
>>> wrote:
>>>
>>>> Hi Beam team,
>>>>
>>>> We have seen data loss with Beam pipelines under the following
>>>> condition. i.e., Beam thinks it has processed data when in reality it
>>>> has not:
>>>>
>>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>>> auto.offset.reset set to latest (default)
>>>>
>>>
>>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>>
>>>
>>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>>
>>>
>>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>>> reads Kafka messages, and the offsets are auto-committed after 5
>>>> seconds (Kafka consumer default).
>>>>
>>>
>>> I think this is again the default behavior of Kafka, right ? So once
>>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>>> Messages are auto-committed ? I don't think Beam would have control over
>>> this so possibly the pipeline user should set the Kafka broker settings to
>>> not auto-commit if the runner does not preserve messages that are read.
>>>
>>>
>>>>
>>>> This is usually not a problem, because Beam saves offsets to
>>>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>>>
>>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>>> processor pipeline), or if there are no previous checkpoints, then
>>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>>> loss situation.
>>>>
>>>
>>> Could you clarify what you mean by pipeline restart here ?  For example,
>>> with Dataflow would ensure a successful checkpoint if a pipeline is
>>> manually drained.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> We can prevent this problem by setting by default:
>>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>>> * Kafka consumer config: enable.auto.commit to false
>>>
>>>
>>>> If the checkpointing is problematic, this can cause continuous
>>>> reprocessing, but it's still better than data loss.
>>>>
>>>> What do you think of this?
>>>>
>>>> Regards,
>>>> Deepak
>>>>
>>> --
Deepak Nagaraj  |  Machine Learning Engineer


We create the tools behind the decisions that change the world.
We're hiring! <https://boards.greenhouse.io/primerai>  |  primer.ai  |  blog
<https://primer.ai/blog/>  |  twitter <https://twitter.com/primer_ai>

Re: possible data loss with Kafka I/O

Posted by Chamikara Jayalath <ch...@google.com>.
On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:

> Cham do you know if the Flunk runner uses the sdf version or the old
> version?
>

I think that depends on whether the experiment "use_deprecated_read" was
specified or not. If that was specified, Flink will use the old version
(which will also be overridden by a native Flink Kafka source
implementation IIRC).


>
> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>> Hi Deepak,
>>
>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <de...@primer.ai>
>> wrote:
>>
>>> Hi Beam team,
>>>
>>> We have seen data loss with Beam pipelines under the following
>>> condition. i.e., Beam thinks it has processed data when in reality it
>>> has not:
>>>
>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>> auto.offset.reset set to latest (default)
>>>
>>
>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>
>>
>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>
>>
>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>> reads Kafka messages, and the offsets are auto-committed after 5
>>> seconds (Kafka consumer default).
>>>
>>
>> I think this is again the default behavior of Kafka, right ? So once
>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>> Messages are auto-committed ? I don't think Beam would have control over
>> this so possibly the pipeline user should set the Kafka broker settings to
>> not auto-commit if the runner does not preserve messages that are read.
>>
>>
>>>
>>> This is usually not a problem, because Beam saves offsets to
>>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>>
>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>> processor pipeline), or if there are no previous checkpoints, then
>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>> loss situation.
>>>
>>
>> Could you clarify what you mean by pipeline restart here ?  For example,
>> with Dataflow would ensure a successful checkpoint if a pipeline is
>> manually drained.
>>
>> Thanks,
>> Cham
>>
>>
>>> We can prevent this problem by setting by default:
>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>> * Kafka consumer config: enable.auto.commit to false
>>
>>
>>> If the checkpointing is problematic, this can cause continuous
>>> reprocessing, but it's still better than data loss.
>>>
>>> What do you think of this?
>>>
>>> Regards,
>>> Deepak
>>>
>>

Re: possible data loss with Kafka I/O

Posted by Reuven Lax <re...@google.com>.
Cham do you know if the Flunk runner uses the sdf version or the old
version?

On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
> Hi Deepak,
>
> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <de...@primer.ai>
> wrote:
>
>> Hi Beam team,
>>
>> We have seen data loss with Beam pipelines under the following
>> condition. i.e., Beam thinks it has processed data when in reality it
>> has not:
>>
>>   * Kafka consumer config: enable.auto.commit set to true (default),
>> auto.offset.reset set to latest (default)
>>
>
> To clarify, these are defaults for Kafka, not for Beam, right ?
>
>
>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>   * No successful checkpoints (i.e. every checkpoint times out)
>
>
>> Under this condition, if we post to Kafka, the pipeline starts up and
>> reads Kafka messages, and the offsets are auto-committed after 5
>> seconds (Kafka consumer default).
>>
>
> I think this is again the default behavior of Kafka, right ? So once
> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
> Messages are auto-committed ? I don't think Beam would have control over
> this so possibly the pipeline user should set the Kafka broker settings to
> not auto-commit if the runner does not preserve messages that are read.
>
>
>>
>> This is usually not a problem, because Beam saves offsets to
>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>
>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>> processor pipeline), or if there are no previous checkpoints, then
>> upon restart Beam starts with the latest Kafka offset. This is a data
>> loss situation.
>>
>
> Could you clarify what you mean by pipeline restart here ?  For example,
> with Dataflow would ensure a successful checkpoint if a pipeline is
> manually drained.
>
> Thanks,
> Cham
>
>
>> We can prevent this problem by setting by default:
>> * ReadFromKafka(): commit_offset_in_finalize to true
>> * Kafka consumer config: enable.auto.commit to false
>
>
>> If the checkpointing is problematic, this can cause continuous
>> reprocessing, but it's still better than data loss.
>>
>> What do you think of this?
>>
>> Regards,
>> Deepak
>>
>

Re: possible data loss with Kafka I/O

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Deepak,

On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <de...@primer.ai>
wrote:

> Hi Beam team,
>
> We have seen data loss with Beam pipelines under the following
> condition. i.e., Beam thinks it has processed data when in reality it
> has not:
>
>   * Kafka consumer config: enable.auto.commit set to true (default),
> auto.offset.reset set to latest (default)
>

To clarify, these are defaults for Kafka, not for Beam, right ?


>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>   * No successful checkpoints (i.e. every checkpoint times out)


> Under this condition, if we post to Kafka, the pipeline starts up and
> reads Kafka messages, and the offsets are auto-committed after 5
> seconds (Kafka consumer default).
>

I think this is again the default behavior of Kafka, right ? So once
messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
Messages are auto-committed ? I don't think Beam would have control over
this so possibly the pipeline user should set the Kafka broker settings to
not auto-commit if the runner does not preserve messages that are read.


>
> This is usually not a problem, because Beam saves offsets to
> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>
> But if the checkpointing never succeeds (e.g. we hit this with a slow
> processor pipeline), or if there are no previous checkpoints, then
> upon restart Beam starts with the latest Kafka offset. This is a data
> loss situation.
>

Could you clarify what you mean by pipeline restart here ?  For example,
with Dataflow would ensure a successful checkpoint if a pipeline is
manually drained.

Thanks,
Cham


> We can prevent this problem by setting by default:
> * ReadFromKafka(): commit_offset_in_finalize to true
> * Kafka consumer config: enable.auto.commit to false


> If the checkpointing is problematic, this can cause continuous
> reprocessing, but it's still better than data loss.
>
> What do you think of this?
>
> Regards,
> Deepak
>