You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Calvo Ferrándiz <ju...@gmail.com> on 2021/12/09 23:35:50 UTC

Fwd: Kafka manually commit offsets

Morning!

First of all, thanks for all the incredible work you do, is amazing. Then,
secondly, I reach you for some help or guidance to manually commit records.
I want to do this so I can commit the record and the end of the pipeline,
and not in the read() of the KafkaIO.

Bearing in mind what I have read in this post:
https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
, and thinking of a pipeline similar to the one described, I understand we
can use commitOffsetsInFinalize() to commit offsets in the read(). What I
don't understand is how this helps to commit the offset if we want to do
this at the end, not in the reading.    Thanks. All comments and
suggestions are more than welcome. :)


*Juan Calvo Ferrándiz*
Data Engineer
Go to LINKEDIN  <https://www.linkedin.com/in/juan-calvo-ferrandiz/>
Go to GITHUB <https://github.com/juancalvof>
Go to MEDIUM <ht...@juancalvoferrandiz>

Re: Kafka manually commit offsets

Posted by Juan Calvo Ferrándiz <ju...@gmail.com>.
Super! Thanks for all this info. Testing to do windowing on a per-key
basis, to have a consumer per topic/partition/schema. This way, the bundle
of data, from a specific window time, seems to wait until the previous one
has been processed, independently of the number of records.


*Juan*


On Mon, 13 Dec 2021 at 20:33, Vincent Marquez <vi...@gmail.com>
wrote:

> What I mean is, if you want to only commit offsets *after* a
> KafkaRecord<K,V> is processed, then you need to keep parallelism to the
> number of partitions, as offsets are monotonically increasing *per
> partition*.  So if you only have one partition and then split into two
> 'threads', if T1 handling offsets A-C fails while T2 handling D-G succeed,
> it will commit back offsets indicating everything processed on T1 also
> succeeded.
>
>
> *~Vincent*
>
>
> On Mon, Dec 13, 2021 at 11:12 AM Luke Cwik <lc...@google.com> wrote:
>
>> I believe you would be able to have parallelism greater than the number
>> of partitions for most of the pipeline. The checkpoint advancement code is
>> likely limited to the number of partitions but can be a very small portion
>> of the pipeline.
>>
>> On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>>
>>> If you want to ensure you have at least once processing I think the
>>> *maximum* amount of parallelization you can have would be the number of
>>> partitions you have, so you'd want to group by partition, process a bundle
>>> of that partition, then commit the last offset for a given partition.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Yes, you will need to deal with records being out of order because the
>>>> system will process many things in parallel.
>>>>
>>>> You can read the last committed offset from Kafka and compare it
>>>> against the offset you have right now. If the offset you have right is not
>>>> the next offset you store it in state and if it is then you find the
>>>> contiguous range of offsets that you have stored in state starting from
>>>> this offset and remove them from state and commit the last one in that
>>>> contiguous range.
>>>>
>>>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> Thanks Alexey! I understand. Continue thinking in possible solutions
>>>>> of committing records, I was thinking about what happens in this scenario:
>>>>>
>>>>> When processing windows of data, do they get processed in sequential
>>>>> order or is it possible for them to be processed out of order? For example
>>>>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>>>>> elements. Assuming Window 1 takes a while to process all of that data, is
>>>>> it possible window 2 will finish before window 1?
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> I answered the similar questions on SO a while ago [1], and I hope it
>>>>>> will help.
>>>>>>
>>>>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>>>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>>>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>>>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>>>>
>>>>>> By manual way, I mean that you should instantiate your own Kafka
>>>>>> client in your DoFn, process input element (as KafkaRecord<K, V>), that was
>>>>>> read before, fetch an offset from KafkaRecord and commit it with your own
>>>>>> client.
>>>>>>
>>>>>> Though, you need to make sure that a call to external API and offset
>>>>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>>>>
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>>>
>>>>>> Thanks Luke for your quick response. I see, that makes sense. Now I
>>>>>> have two new questions if I may:
>>>>>> a) How I can get the offsets I want to commit. My investigation now
>>>>>> is going throw getCheckpointMark(), is this correct?
>>>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>>>>
>>>>>> b) With these offsets, I will create a client at the of the pipeline,
>>>>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>>>>> this correct?
>>>>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>>>>
>>>>>> Thanks!!!
>>>>>>
>>>>>> *Juan *
>>>>>>
>>>>>>
>>>>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> commitOffsetsInFinalize is about committing the offset after the
>>>>>>> output has been durably persisted for the bundle containing the Kafka Read.
>>>>>>> The bundle represents a unit of work over a subgraph of the pipeline. You
>>>>>>> will want to ensure the commitOffsetsInFinalize is disabled and that the
>>>>>>> Kafka consumer config doesn't auto commit automatically. This will ensure
>>>>>>> that KafkaIO.Read doesn't commit the offsets. Then it is upto your
>>>>>>> PTransform to perform the committing.
>>>>>>>
>>>>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>>>>
>>>>>>>> Morning!
>>>>>>>>
>>>>>>>> First of all, thanks for all the incredible work you do, is
>>>>>>>> amazing. Then, secondly, I reach you for some help or guidance to manually
>>>>>>>> commit records. I want to do this so I can commit the record and the end of
>>>>>>>> the pipeline, and not in the read() of the KafkaIO.
>>>>>>>>
>>>>>>>> Bearing in mind what I have read in this post:
>>>>>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>>>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>>>>> What I don't understand is how this helps to commit the offset if we want
>>>>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>>>>> suggestions are more than welcome. :)
>>>>>>>>
>>>>>>>>
>>>>>>>> *Juan *
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>

Re: Kafka manually commit offsets

Posted by Vincent Marquez <vi...@gmail.com>.
What I mean is, if you want to only commit offsets *after* a
KafkaRecord<K,V> is processed, then you need to keep parallelism to the
number of partitions, as offsets are monotonically increasing *per
partition*.  So if you only have one partition and then split into two
'threads', if T1 handling offsets A-C fails while T2 handling D-G succeed,
it will commit back offsets indicating everything processed on T1 also
succeeded.


*~Vincent*


On Mon, Dec 13, 2021 at 11:12 AM Luke Cwik <lc...@google.com> wrote:

> I believe you would be able to have parallelism greater than the number of
> partitions for most of the pipeline. The checkpoint advancement code is
> likely limited to the number of partitions but can be a very small portion
> of the pipeline.
>
> On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez <
> vincent.marquez@gmail.com> wrote:
>
>> If you want to ensure you have at least once processing I think the
>> *maximum* amount of parallelization you can have would be the number of
>> partitions you have, so you'd want to group by partition, process a bundle
>> of that partition, then commit the last offset for a given partition.
>>
>> *~Vincent*
>>
>>
>> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Yes, you will need to deal with records being out of order because the
>>> system will process many things in parallel.
>>>
>>> You can read the last committed offset from Kafka and compare it against
>>> the offset you have right now. If the offset you have right is not the next
>>> offset you store it in state and if it is then you find the contiguous
>>> range of offsets that you have stored in state starting from this offset
>>> and remove them from state and commit the last one in that contiguous range.
>>>
>>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
>>> juancalvoferrandiz@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Thanks Alexey! I understand. Continue thinking in possible solutions
>>>> of committing records, I was thinking about what happens in this scenario:
>>>>
>>>> When processing windows of data, do they get processed in sequential
>>>> order or is it possible for them to be processed out of order? For example
>>>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>>>> elements. Assuming Window 1 takes a while to process all of that data, is
>>>> it possible window 2 will finish before window 1?
>>>>
>>>> Thanks again!
>>>>
>>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> I answered the similar questions on SO a while ago [1], and I hope it
>>>>> will help.
>>>>>
>>>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>>>
>>>>> By manual way, I mean that you should instantiate your own Kafka
>>>>> client in your DoFn, process input element (as KafkaRecord<K, V>), that was
>>>>> read before, fetch an offset from KafkaRecord and commit it with your own
>>>>> client.
>>>>>
>>>>> Though, you need to make sure that a call to external API and offset
>>>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>>>
>>>>> [1]
>>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>>
>>>>> Thanks Luke for your quick response. I see, that makes sense. Now I
>>>>> have two new questions if I may:
>>>>> a) How I can get the offsets I want to commit. My investigation now is
>>>>> going throw getCheckpointMark(), is this correct?
>>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>>>
>>>>> b) With these offsets, I will create a client at the of the pipeline,
>>>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>>>> this correct?
>>>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>>>
>>>>> Thanks!!!
>>>>>
>>>>> *Juan *
>>>>>
>>>>>
>>>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> commitOffsetsInFinalize is about committing the offset after the
>>>>>> output has been durably persisted for the bundle containing the Kafka Read.
>>>>>> The bundle represents a unit of work over a subgraph of the pipeline. You
>>>>>> will want to ensure the commitOffsetsInFinalize is disabled and that the
>>>>>> Kafka consumer config doesn't auto commit automatically. This will ensure
>>>>>> that KafkaIO.Read doesn't commit the offsets. Then it is upto your
>>>>>> PTransform to perform the committing.
>>>>>>
>>>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>>>
>>>>>>> Morning!
>>>>>>>
>>>>>>> First of all, thanks for all the incredible work you do, is amazing.
>>>>>>> Then, secondly, I reach you for some help or guidance to manually commit
>>>>>>> records. I want to do this so I can commit the record and the end of the
>>>>>>> pipeline, and not in the read() of the KafkaIO.
>>>>>>>
>>>>>>> Bearing in mind what I have read in this post:
>>>>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>>>> What I don't understand is how this helps to commit the offset if we want
>>>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>>>> suggestions are more than welcome. :)
>>>>>>>
>>>>>>>
>>>>>>> *Juan *
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>

Re: Kafka manually commit offsets

Posted by Luke Cwik <lc...@google.com>.
I believe you would be able to have parallelism greater than the number of
partitions for most of the pipeline. The checkpoint advancement code is
likely limited to the number of partitions but can be a very small portion
of the pipeline.

On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez <vi...@gmail.com>
wrote:

> If you want to ensure you have at least once processing I think the
> *maximum* amount of parallelization you can have would be the number of
> partitions you have, so you'd want to group by partition, process a bundle
> of that partition, then commit the last offset for a given partition.
>
> *~Vincent*
>
>
> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <lc...@google.com> wrote:
>
>> Yes, you will need to deal with records being out of order because the
>> system will process many things in parallel.
>>
>> You can read the last committed offset from Kafka and compare it against
>> the offset you have right now. If the offset you have right is not the next
>> offset you store it in state and if it is then you find the contiguous
>> range of offsets that you have stored in state starting from this offset
>> and remove them from state and commit the last one in that contiguous range.
>>
>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
>> juancalvoferrandiz@gmail.com> wrote:
>>
>>>
>>>
>>> Thanks Alexey! I understand. Continue thinking in possible solutions of
>>> committing records, I was thinking about what happens in this scenario:
>>>
>>> When processing windows of data, do they get processed in sequential
>>> order or is it possible for them to be processed out of order? For example
>>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>>> elements. Assuming Window 1 takes a while to process all of that data, is
>>> it possible window 2 will finish before window 1?
>>>
>>> Thanks again!
>>>
>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <ar...@gmail.com>
>>> wrote:
>>>
>>>> I answered the similar questions on SO a while ago [1], and I hope it
>>>> will help.
>>>>
>>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>>
>>>> By manual way, I mean that you should instantiate your own Kafka client
>>>> in your DoFn, process input element (as KafkaRecord<K, V>), that was read
>>>> before, fetch an offset from KafkaRecord and commit it with your own
>>>> client.
>>>>
>>>> Though, you need to make sure that a call to external API and offset
>>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>>
>>>> [1]
>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>>
>>>> —
>>>> Alexey
>>>>
>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>
>>>> Thanks Luke for your quick response. I see, that makes sense. Now I
>>>> have two new questions if I may:
>>>> a) How I can get the offsets I want to commit. My investigation now is
>>>> going throw getCheckpointMark(), is this correct?
>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>>
>>>> b) With these offsets, I will create a client at the of the pipeline,
>>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>>> this correct?
>>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>>
>>>> Thanks!!!
>>>>
>>>> *Juan *
>>>>
>>>>
>>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> commitOffsetsInFinalize is about committing the offset after the
>>>>> output has been durably persisted for the bundle containing the Kafka Read.
>>>>> The bundle represents a unit of work over a subgraph of the pipeline. You
>>>>> will want to ensure the commitOffsetsInFinalize is disabled and that the
>>>>> Kafka consumer config doesn't auto commit automatically. This will ensure
>>>>> that KafkaIO.Read doesn't commit the offsets. Then it is upto your
>>>>> PTransform to perform the committing.
>>>>>
>>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>>
>>>>>> Morning!
>>>>>>
>>>>>> First of all, thanks for all the incredible work you do, is amazing.
>>>>>> Then, secondly, I reach you for some help or guidance to manually commit
>>>>>> records. I want to do this so I can commit the record and the end of the
>>>>>> pipeline, and not in the read() of the KafkaIO.
>>>>>>
>>>>>> Bearing in mind what I have read in this post:
>>>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>>> What I don't understand is how this helps to commit the offset if we want
>>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>>> suggestions are more than welcome. :)
>>>>>>
>>>>>>
>>>>>> *Juan *
>>>>>>
>>>>>>
>>>>>>
>>>>

Re: Kafka manually commit offsets

Posted by Vincent Marquez <vi...@gmail.com>.
If you want to ensure you have at least once processing I think the
*maximum* amount of parallelization you can have would be the number of
partitions you have, so you'd want to group by partition, process a bundle
of that partition, then commit the last offset for a given partition.

*~Vincent*


On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <lc...@google.com> wrote:

> Yes, you will need to deal with records being out of order because the
> system will process many things in parallel.
>
> You can read the last committed offset from Kafka and compare it against
> the offset you have right now. If the offset you have right is not the next
> offset you store it in state and if it is then you find the contiguous
> range of offsets that you have stored in state starting from this offset
> and remove them from state and commit the last one in that contiguous range.
>
> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
> juancalvoferrandiz@gmail.com> wrote:
>
>>
>>
>> Thanks Alexey! I understand. Continue thinking in possible solutions of
>> committing records, I was thinking about what happens in this scenario:
>>
>> When processing windows of data, do they get processed in sequential
>> order or is it possible for them to be processed out of order? For example
>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>> elements. Assuming Window 1 takes a while to process all of that data, is
>> it possible window 2 will finish before window 1?
>>
>> Thanks again!
>>
>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <ar...@gmail.com>
>> wrote:
>>
>>> I answered the similar questions on SO a while ago [1], and I hope it
>>> will help.
>>>
>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>
>>> By manual way, I mean that you should instantiate your own Kafka client
>>> in your DoFn, process input element (as KafkaRecord<K, V>), that was read
>>> before, fetch an offset from KafkaRecord and commit it with your own
>>> client.
>>>
>>> Though, you need to make sure that a call to external API and offset
>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>
>>> [1]
>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>
>>> —
>>> Alexey
>>>
>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>> juancalvoferrandiz@gmail.com> wrote:
>>>
>>> Thanks Luke for your quick response. I see, that makes sense. Now I have
>>> two new questions if I may:
>>> a) How I can get the offsets I want to commit. My investigation now is
>>> going throw getCheckpointMark(), is this correct?
>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>
>>> b) With these offsets, I will create a client at the of the pipeline,
>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>> this correct?
>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>
>>> Thanks!!!
>>>
>>> *Juan *
>>>
>>>
>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>>
>>>> commitOffsetsInFinalize is about committing the offset after the output
>>>> has been durably persisted for the bundle containing the Kafka Read. The
>>>> bundle represents a unit of work over a subgraph of the pipeline. You will
>>>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>>>> consumer config doesn't auto commit automatically. This will ensure that
>>>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>>>> perform the committing.
>>>>
>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>> juancalvoferrandiz@gmail.com> wrote:
>>>>
>>>>> Morning!
>>>>>
>>>>> First of all, thanks for all the incredible work you do, is amazing.
>>>>> Then, secondly, I reach you for some help or guidance to manually commit
>>>>> records. I want to do this so I can commit the record and the end of the
>>>>> pipeline, and not in the read() of the KafkaIO.
>>>>>
>>>>> Bearing in mind what I have read in this post:
>>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>> What I don't understand is how this helps to commit the offset if we want
>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>> suggestions are more than welcome. :)
>>>>>
>>>>>
>>>>> *Juan *
>>>>>
>>>>>
>>>>>
>>>

Re: Kafka manually commit offsets

Posted by Luke Cwik <lc...@google.com>.
Yes, you will need to deal with records being out of order because the
system will process many things in parallel.

You can read the last committed offset from Kafka and compare it against
the offset you have right now. If the offset you have right is not the next
offset you store it in state and if it is then you find the contiguous
range of offsets that you have stored in state starting from this offset
and remove them from state and commit the last one in that contiguous range.

On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
juancalvoferrandiz@gmail.com> wrote:

>
>
> Thanks Alexey! I understand. Continue thinking in possible solutions of
> committing records, I was thinking about what happens in this scenario:
>
> When processing windows of data, do they get processed in sequential order
> or is it possible for them to be processed out of order? For example Window
> 1 contains 10000 elements of data whereas window 2 contains 10 elements.
> Assuming Window 1 takes a while to process all of that data, is it possible
> window 2 will finish before window 1?
>
> Thanks again!
>
> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> I answered the similar questions on SO a while ago [1], and I hope it
>> will help.
>>
>> “By default, pipeline.apply(KafkaIO.read()...) will return
>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>> get an offset from KafkaRecord metadata and commit it manually in a way
>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>
>> By manual way, I mean that you should instantiate your own Kafka client
>> in your DoFn, process input element (as KafkaRecord<K, V>), that was read
>> before, fetch an offset from KafkaRecord and commit it with your own
>> client.
>>
>> Though, you need to make sure that a call to external API and offset
>> commit will be atomic to prevent potential data loss (if it's critical)."
>>
>> [1]
>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>
>> —
>> Alexey
>>
>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>> juancalvoferrandiz@gmail.com> wrote:
>>
>> Thanks Luke for your quick response. I see, that makes sense. Now I have
>> two new questions if I may:
>> a) How I can get the offsets I want to commit. My investigation now is
>> going throw getCheckpointMark(), is this correct?
>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>
>> b) With these offsets, I will create a client at the of the pipeline,
>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>> this correct?
>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>
>> Thanks!!!
>>
>> *Juan *
>>
>>
>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>
>>> commitOffsetsInFinalize is about committing the offset after the output
>>> has been durably persisted for the bundle containing the Kafka Read. The
>>> bundle represents a unit of work over a subgraph of the pipeline. You will
>>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>>> consumer config doesn't auto commit automatically. This will ensure that
>>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>>> perform the committing.
>>>
>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>> juancalvoferrandiz@gmail.com> wrote:
>>>
>>>> Morning!
>>>>
>>>> First of all, thanks for all the incredible work you do, is amazing.
>>>> Then, secondly, I reach you for some help or guidance to manually commit
>>>> records. I want to do this so I can commit the record and the end of the
>>>> pipeline, and not in the read() of the KafkaIO.
>>>>
>>>> Bearing in mind what I have read in this post:
>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>> What I don't understand is how this helps to commit the offset if we want
>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>> suggestions are more than welcome. :)
>>>>
>>>>
>>>> *Juan *
>>>>
>>>>
>>>>
>>

Re: Kafka manually commit offsets

Posted by Juan Calvo Ferrándiz <ju...@gmail.com>.
Thanks Alexey! I understand. Continue thinking in possible solutions of
committing records, I was thinking about what happens in this scenario:

When processing windows of data, do they get processed in sequential order
or is it possible for them to be processed out of order? For example Window
1 contains 10000 elements of data whereas window 2 contains 10 elements.
Assuming Window 1 takes a while to process all of that data, is it possible
window 2 will finish before window 1?

Thanks again!

On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <ar...@gmail.com>
wrote:

> I answered the similar questions on SO a while ago [1], and I hope it will
> help.
>
> “By default, pipeline.apply(KafkaIO.read()...) will return
> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
> get an offset from KafkaRecord metadata and commit it manually in a way
> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>
> By manual way, I mean that you should instantiate your own Kafka client in
> your DoFn, process input element (as KafkaRecord<K, V>), that was read
> before, fetch an offset from KafkaRecord and commit it with your own
> client.
>
> Though, you need to make sure that a call to external API and offset
> commit will be atomic to prevent potential data loss (if it's critical)."
>
> [1]
> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>
> —
> Alexey
>
> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
> juancalvoferrandiz@gmail.com> wrote:
>
> Thanks Luke for your quick response. I see, that makes sense. Now I have
> two new questions if I may:
> a) How I can get the offsets I want to commit. My investigation now is
> going throw getCheckpointMark(), is this correct?
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>
> b) With these offsets, I will create a client at the of the pipeline, with
> Kafka library, and methods such as commitSync() and commitAsync(). Is this
> correct?
> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>
> Thanks!!!
>
> *Juan *
>
>
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>
>> commitOffsetsInFinalize is about committing the offset after the output
>> has been durably persisted for the bundle containing the Kafka Read. The
>> bundle represents a unit of work over a subgraph of the pipeline. You will
>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>> consumer config doesn't auto commit automatically. This will ensure that
>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>> perform the committing.
>>
>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>> juancalvoferrandiz@gmail.com> wrote:
>>
>>> Morning!
>>>
>>> First of all, thanks for all the incredible work you do, is amazing.
>>> Then, secondly, I reach you for some help or guidance to manually commit
>>> records. I want to do this so I can commit the record and the end of the
>>> pipeline, and not in the read() of the KafkaIO.
>>>
>>> Bearing in mind what I have read in this post:
>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>> , and thinking of a pipeline similar to the one described, I understand we
>>> can use commitOffsetsInFinalize() to commit offsets in the read(). What
>>> I don't understand is how this helps to commit the offset if we want to do
>>> this at the end, not in the reading.    Thanks. All comments and
>>> suggestions are more than welcome. :)
>>>
>>>
>>> *Juan *
>>>
>>>
>>>
>

Re: Kafka manually commit offsets

Posted by Alexey Romanenko <ar...@gmail.com>.
I answered the similar questions on SO a while ago [1], and I hope it will help.

“By default, pipeline.apply(KafkaIO.read()...) will return a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can get an offset from KafkaRecord metadata and commit it manually in a way that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).

By manual way, I mean that you should instantiate your own Kafka client in your DoFn, process input element (as KafkaRecord<K, V>), that was read before, fetch an offset from KafkaRecord and commit it with your own client. 

Though, you need to make sure that a call to external API and offset commit will be atomic to prevent potential data loss (if it's critical)."

[1] https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880 <https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880>

—
Alexey

> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <ju...@gmail.com> wrote:
> 
> Thanks Luke for your quick response. I see, that makes sense. Now I have two new questions if I may: 
> a) How I can get the offsets I want to commit. My investigation now is going throw getCheckpointMark(), is this correct? https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource>
> 
> b) With these offsets, I will create a client at the of the pipeline, with Kafka library, and methods such as commitSync() and commitAsync(). Is this correct? https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of <https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of>
> 
> Thanks!!!
> 
> Juan 
> 
> 
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> commitOffsetsInFinalize is about committing the offset after the output has been durably persisted for the bundle containing the Kafka Read. The bundle represents a unit of work over a subgraph of the pipeline. You will want to ensure the commitOffsetsInFinalize is disabled and that the Kafka consumer config doesn't auto commit automatically. This will ensure that KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to perform the committing.
> 
> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <juancalvoferrandiz@gmail.com <ma...@gmail.com>> wrote:
> Morning!
> 
> First of all, thanks for all the incredible work you do, is amazing. Then, secondly, I reach you for some help or guidance to manually commit records. I want to do this so I can commit the record and the end of the pipeline, and not in the read() of the KafkaIO.
> 
> Bearing in mind what I have read in this post: https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit <https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit> , and thinking of a pipeline similar to the one described, I understand we can use commitOffsetsInFinalize() to commit offsets in the read(). What I don't understand is how this helps to commit the offset if we want to do this at the end, not in the reading.     Thanks. All comments and suggestions are more than welcome. :) 
> 
> Juan Calvo Ferrándiz
> Data Engineer
> Go to LINKEDIN  <https://www.linkedin.com/in/juan-calvo-ferrandiz/>
> Go to GITHUB <https://github.com/juancalvof>
> Go to MEDIUM <ht...@juancalvoferrandiz> 
> 


Re: Kafka manually commit offsets

Posted by Juan Calvo Ferrándiz <ju...@gmail.com>.
Also, I was thinking if we could end up with some kind of race
conditioning:

bundle 1 contains:
Messages [1,2,3,4,5]
bundle 2 contains:
Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all
messages up to commit 7. If bundle 1 fails for whatever reason we
potentially lose that data, right?

PS: I miss an "end" between: ...at the "end" of the pipeline... in previous
question b)

Thank you Luke!


*Juan*


On Fri, 10 Dec 2021 at 10:40, Juan Calvo Ferrándiz <
juancalvoferrandiz@gmail.com> wrote:

> Thanks Luke for your quick response. I see, that makes sense. Now I have
> two new questions if I may:
> a) How I can get the offsets I want to commit. My investigation now is
> going throw getCheckpointMark(), is this correct?
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>
> b) With these offsets, I will create a client at the of the pipeline, with
> Kafka library, and methods such as commitSync() and commitAsync(). Is this
> correct?
> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>
> Thanks!!!
>
> *Juan *
>
>
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>
>> commitOffsetsInFinalize is about committing the offset after the output
>> has been durably persisted for the bundle containing the Kafka Read. The
>> bundle represents a unit of work over a subgraph of the pipeline. You will
>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>> consumer config doesn't auto commit automatically. This will ensure that
>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>> perform the committing.
>>
>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>> juancalvoferrandiz@gmail.com> wrote:
>>
>>> Morning!
>>>
>>> First of all, thanks for all the incredible work you do, is amazing.
>>> Then, secondly, I reach you for some help or guidance to manually commit
>>> records. I want to do this so I can commit the record and the end of the
>>> pipeline, and not in the read() of the KafkaIO.
>>>
>>> Bearing in mind what I have read in this post:
>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>> , and thinking of a pipeline similar to the one described, I understand we
>>> can use commitOffsetsInFinalize() to commit offsets in the read(). What
>>> I don't understand is how this helps to commit the offset if we want to do
>>> this at the end, not in the reading.    Thanks. All comments and
>>> suggestions are more than welcome. :)
>>>
>>>
>>> *Juan Calvo Ferrándiz*
>>> Data Engineer
>>> Go to LINKEDIN  <https://www.linkedin.com/in/juan-calvo-ferrandiz/>
>>> Go to GITHUB <https://github.com/juancalvof>
>>> Go to MEDIUM <ht...@juancalvoferrandiz>
>>>
>>>

Re: Kafka manually commit offsets

Posted by Juan Calvo Ferrándiz <ju...@gmail.com>.
Thanks Luke for your quick response. I see, that makes sense. Now I have
two new questions if I may:
a) How I can get the offsets I want to commit. My investigation now is
going throw getCheckpointMark(), is this correct?
https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource

b) With these offsets, I will create a client at the of the pipeline, with
Kafka library, and methods such as commitSync() and commitAsync(). Is this
correct?
https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of

Thanks!!!

*Juan *


On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:

> commitOffsetsInFinalize is about committing the offset after the output
> has been durably persisted for the bundle containing the Kafka Read. The
> bundle represents a unit of work over a subgraph of the pipeline. You will
> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
> consumer config doesn't auto commit automatically. This will ensure that
> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
> perform the committing.
>
> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
> juancalvoferrandiz@gmail.com> wrote:
>
>> Morning!
>>
>> First of all, thanks for all the incredible work you do, is amazing.
>> Then, secondly, I reach you for some help or guidance to manually commit
>> records. I want to do this so I can commit the record and the end of the
>> pipeline, and not in the read() of the KafkaIO.
>>
>> Bearing in mind what I have read in this post:
>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>> , and thinking of a pipeline similar to the one described, I understand we
>> can use commitOffsetsInFinalize() to commit offsets in the read(). What
>> I don't understand is how this helps to commit the offset if we want to do
>> this at the end, not in the reading.    Thanks. All comments and
>> suggestions are more than welcome. :)
>>
>>
>> *Juan Calvo Ferrándiz*
>> Data Engineer
>> Go to LINKEDIN  <https://www.linkedin.com/in/juan-calvo-ferrandiz/>
>> Go to GITHUB <https://github.com/juancalvof>
>> Go to MEDIUM <ht...@juancalvoferrandiz>
>>
>>