You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jean Wisser <jw...@flowtraders.com> on 2022/06/06 17:04:32 UTC

[Questions] KafkaIO SplittableDoFn offset managment

Hi,


I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0).


- In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition).

But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using:

consumerFactoryFn.apply(
    KafkaIOUtils.getOffsetConsumerConfig(
        "initialOffset", offsetConsumerConfig, updatedConsumerConfig))


and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId.

How is it supposed to get the last committed offset if the group name is not the same?


- I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by using WatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of().

But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ?


Thanks a lot for your help,

Jean.


Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.'s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.

Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by Jean Wisser <jw...@flowtraders.com>.
Hi John,Thanks for your

Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

________________________________
From: John Casey <jo...@google.com>
Sent: Monday, June 6, 2022 8:19:22 PM
To: user
Subject: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

CAUTION: This Email is from an EXTERNAL source. Ensure you trust this sender before clicking on any links or attachments.
Hi Jean,

I recently resolved some bugs involved in committing offsets to incorrect group names, and those fixes should be available in the most recent version of beam (2.38).

For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order.
This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space.

John

On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi,


I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0).


- In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition).

But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using:

consumerFactoryFn.apply(
    KafkaIOUtils.getOffsetConsumerConfig(
        "initialOffset", offsetConsumerConfig, updatedConsumerConfig))


and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId.

How is it supposed to get the last committed offset if the group name is not the same?


- I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by usingWatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of().

But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ?


Thanks a lot for your help,

Jean.


Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.

Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by Jean Wisser <jw...@flowtraders.com>.
Hey John,


There is something I am not sure to understand about the statement you made previously:

For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order.
This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space.

From the beam documentation (https://beam.apache.org/blog/splittable-do-fn/), it is stated that:
Splitting produces a primary and residual restriction that add up to the original restriction being split: the current@ProcessElement call keeps processing the primary, and the residual will be processed by another@ProcessElement call. For example, a runner may schedule the residual to be processed in parallel on another worker.

Doesn't that mean that if the first restriction is from offsets [100, 150), the next restriction [150, 200) can be scheduled on a different worker ?


Thanks for your help.

Jean.

Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

________________________________
From: Jean Wisser
Sent: Tuesday, June 7, 2022 11:31:44 AM
To: user
Subject: Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment


Hey John,


As suggested, I have created the following issue to track that: https://github.com/apache/beam/issues/21730

Unfortunately I don't have a simple test pipeline, but I think the fix should be straightforward and the same as you did inKafkaCommitOffset.

I have actually tested myself by fixing the consumer name in initialRestriction() and it is able to get the latest committed offset.


Thanks,
Jean.

[Bug]: KafkaIO SplittableDoFn not resuming from last committed offset · Issue #21730 · apache/beam · GitHub<https://github.com/apache/beam/issues/21730>
github.com
What happened? Using KafkaIO with ReadFromKafkaDoFn.java and commitOffsetsFinalize should commit offsets of processed messages and if the pipeline is restarted, should resume from the last committed offset. While committing the offset wo...


________________________________
From: John Casey <jo...@google.com>
Sent: Monday, June 6, 2022 10:24:20 PM
To: user
Subject: Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

The bugs I fixed were primarily focused on committing back to Kafka, so it may be that there is a similar issue with generating the initial restriction.

Looking at the code, restriction tracking does something similar.

This does sound like a bug to me. Could you raise this as an issue on Github? https://github.com/apache/beam/issues<https://urldefense.com/v3/__https://github.com/apache/beam/issues__;!!Oya6xDjLWA!1qAGJzOsuOjX-lB2xxqC8MrPQiYpJB8lHmqUBBoJ9hAjA3PG1jIgK7_5eCqK_J_gpubmEtogKPwzki5sZCNeN_Q$>
If you have a trivial pipeline that reproduces this, that would be very helpful as well.

Thanks,
John

On Mon, Jun 6, 2022 at 4:06 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi John,


Thanks for your answer.

I have tried using the latest version 2.39.0.

While I see the consumer group in KafkaCommitOffset is now correct and I see offsets being committed to kafka, each time I restart the pipeline it still start again from the first offset and not the latest committed.

I guess the reason for that is because in ReadFromKafkaDoFn in theinitialRestriction(), the consumer has still a different name (prefixed with "initialOffset").

Am I doing something wrong ?


Thanks,

Jean.

________________________________
From: John Casey <jo...@google.com>>
Sent: Monday, June 6, 2022 8:19:22 PM
To: user
Subject: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

CAUTION: This Email is from an EXTERNAL source. Ensure you trust this sender before clicking on any links or attachments.
Hi Jean,

I recently resolved some bugs involved in committing offsets to incorrect group names, and those fixes should be available in the most recent version of beam (2.38).

For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order.
This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space.

John

On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi,


I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0).


- In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition).

But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using:

consumerFactoryFn.apply(
    KafkaIOUtils.getOffsetConsumerConfig(
        "initialOffset", offsetConsumerConfig, updatedConsumerConfig))


and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId.

How is it supposed to get the last committed offset if the group name is not the same?


- I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by usingWatchKafkaTopicPartitionDoFn when buildingKafkaSourceDescriptor.of().

But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ?


Thanks a lot for your help,

Jean.


Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.

Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by Jean Wisser <jw...@flowtraders.com>.
Hey John,


As suggested, I have created the following issue to track that: https://github.com/apache/beam/issues/21730

Unfortunately I don't have a simple test pipeline, but I think the fix should be straightforward and the same as you did inKafkaCommitOffset.

I have actually tested myself by fixing the consumer name in initialRestriction() and it is able to get the latest committed offset.


Thanks,
Jean.

[Bug]: KafkaIO SplittableDoFn not resuming from last committed offset · Issue #21730 · apache/beam · GitHub<https://github.com/apache/beam/issues/21730>
github.com
What happened? Using KafkaIO with ReadFromKafkaDoFn.java and commitOffsetsFinalize should commit offsets of processed messages and if the pipeline is restarted, should resume from the last committed offset. While committing the offset wo...



Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

________________________________
From: John Casey <jo...@google.com>
Sent: Monday, June 6, 2022 10:24:20 PM
To: user
Subject: Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

The bugs I fixed were primarily focused on committing back to Kafka, so it may be that there is a similar issue with generating the initial restriction.

Looking at the code, restriction tracking does something similar.

This does sound like a bug to me. Could you raise this as an issue on Github? https://github.com/apache/beam/issues<https://urldefense.com/v3/__https://github.com/apache/beam/issues__;!!Oya6xDjLWA!1qAGJzOsuOjX-lB2xxqC8MrPQiYpJB8lHmqUBBoJ9hAjA3PG1jIgK7_5eCqK_J_gpubmEtogKPwzki5sZCNeN_Q$>
If you have a trivial pipeline that reproduces this, that would be very helpful as well.

Thanks,
John

On Mon, Jun 6, 2022 at 4:06 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi John,


Thanks for your answer.

I have tried using the latest version 2.39.0.

While I see the consumer group in KafkaCommitOffset is now correct and I see offsets being committed to kafka, each time I restart the pipeline it still start again from the first offset and not the latest committed.

I guess the reason for that is because in ReadFromKafkaDoFn in theinitialRestriction(), the consumer has still a different name (prefixed with "initialOffset").

Am I doing something wrong ?


Thanks,

Jean.

________________________________
From: John Casey <jo...@google.com>>
Sent: Monday, June 6, 2022 8:19:22 PM
To: user
Subject: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

CAUTION: This Email is from an EXTERNAL source. Ensure you trust this sender before clicking on any links or attachments.
Hi Jean,

I recently resolved some bugs involved in committing offsets to incorrect group names, and those fixes should be available in the most recent version of beam (2.38).

For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order.
This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space.

John

On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi,


I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0).


- In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition).

But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using:

consumerFactoryFn.apply(
    KafkaIOUtils.getOffsetConsumerConfig(
        "initialOffset", offsetConsumerConfig, updatedConsumerConfig))


and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId.

How is it supposed to get the last committed offset if the group name is not the same?


- I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by usingWatchKafkaTopicPartitionDoFn when buildingKafkaSourceDescriptor.of().

But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ?


Thanks a lot for your help,

Jean.


Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.

Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by John Casey <jo...@google.com>.
The bugs I fixed were primarily focused on committing back to Kafka, so it
may be that there is a similar issue with generating the initial
restriction.

Looking at the code, restriction tracking does something similar.

This does sound like a bug to me. Could you raise this as an issue on
Github? https://github.com/apache/beam/issues
If you have a trivial pipeline that reproduces this, that would be very
helpful as well.

Thanks,
John

On Mon, Jun 6, 2022 at 4:06 PM Jean Wisser <jw...@flowtraders.com> wrote:

> Hi John,
>
>
> Thanks for your answer.
>
> I have tried using the latest version 2.39.0.
>
> While I see the consumer group in KafkaCommitOffset is now correct and I
> see offsets being committed to kafka, each time I restart the pipeline it
> still start again from the first offset and not the latest committed.
>
> I guess the reason for that is because in ReadFromKafkaDoFn in the
> initialRestriction(), the consumer has still a different name (prefixed
> with "initialOffset").
>
> Am I doing something wrong ?
>
>
> Thanks,
>
> Jean.
>
> Jean Wisser
> Data Engineer
>
> Flow Traders B.V.
>
> *T: *+31 20 799 6497 <+31%2020%20799%206497>
> *F:* +31 20 799 6780 <+31%2020%20799%206780>
>
> Jacob Bontiusplaats 9
> 1018 LL Amsterdam
> Nederland
> www.flowtraders.com
> ------------------------------
> *From:* John Casey <jo...@google.com>
> *Sent:* Monday, June 6, 2022 8:19:22 PM
> *To:* user
> *Subject:* [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset
> managment
>
> *CAUTION: *This Email is from an EXTERNAL source. Ensure you trust this
> sender before clicking on any links or attachments.
> Hi Jean,
>
> I recently resolved some bugs involved in committing offsets to incorrect
> group names, and those fixes should be available in the most recent version
> of beam (2.38).
>
> For your second question, a single partition can only be consumed by a
> single worker, so there is no issues with committing offsets in the correct
> order.
> This is because the unit of parallelism in Kafka is the partition, so
> KafkaIO is currently designed to work within that space.
>
> John
>
> On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com>
> wrote:
>
>> Hi,
>>
>>
>> I am trying to use KafkaIO with the new SplittableDoFn, but I am having
>> trouble understanding how offset management is supposed to work (using beam
>> version 2.36.0).
>>
>>
>> - In the comments of ReadFromKafkaDoFn it is stated that the initial
>> range for KafkaSourceDescriptor startOffset can take the value of the
>> {@code last committed offset + 1} for the {@link
>> Consumer#position(TopicPartition).
>>
>> But when looking at the implementation of initialRestriction(), the
>> consumer that tries to get the committed offset is built using:
>> consumerFactoryFn.apply(
>>     KafkaIOUtils.getOffsetConsumerConfig(
>>         "initialOffset", offsetConsumerConfig, updatedConsumerConfig))
>>
>>
>> and getOffsetConsumerConfig() appends a random number to the consumer
>> group name. In addition to that, KafkaCommitOffset which is responsible
>> for committing offsets uses a different name for the consumer groupId.
>>
>> How is it supposed to get the last committed offset if the group name is
>> not the same?
>>
>>
>> - I am also trying to commit the offsets manually at the end of my
>> pipeline and get KafkaIO to resume from the last committed offset by using
>> WatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of().
>>
>> But since - if I understand correctly - a single (topic,partition) can be
>> split into multiple workers, how can we make sure that we always commit
>> offsets in the correct order ?
>>
>>
>> Thanks a lot for your help,
>>
>> Jean.
>>
>> *Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered
> office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is
> registered with the Trade Registry of the Chamber of Commerce under number
> 33.22.3268. This message may contain information that is not intended for
> you. If you are not the addressee or if this message was sent to you by
> mistake, you are requested to inform the sender and delete the message.
> This message may not be forwarded or published to any other person than its
> addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V.
> accepts no liability for damage of any kind resulting from the risks
> inherent in the electronic transmission of messages.*
>

Re: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by Jean Wisser <jw...@flowtraders.com>.
Hi John,


Thanks for your answer.

I have tried using the latest version 2.39.0.

While I see the consumer group in KafkaCommitOffset is now correct and I see offsets being committed to kafka, each time I restart the pipeline it still start again from the first offset and not the latest committed.

I guess the reason for that is because in ReadFromKafkaDoFn in theinitialRestriction(), the consumer has still a different name (prefixed with "initialOffset").

Am I doing something wrong ?


Thanks,

Jean.

Jean Wisser
Data Engineer

[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]

Flow Traders B.V.

T: +31 20 799 6497
F: +31 20 799 6780

Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>

________________________________
From: John Casey <jo...@google.com>
Sent: Monday, June 6, 2022 8:19:22 PM
To: user
Subject: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment

CAUTION: This Email is from an EXTERNAL source. Ensure you trust this sender before clicking on any links or attachments.
Hi Jean,

I recently resolved some bugs involved in committing offsets to incorrect group names, and those fixes should be available in the most recent version of beam (2.38).

For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order.
This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space.

John

On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com>> wrote:

Hi,


I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0).


- In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition).

But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using:

consumerFactoryFn.apply(
    KafkaIOUtils.getOffsetConsumerConfig(
        "initialOffset", offsetConsumerConfig, updatedConsumerConfig))


and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId.

How is it supposed to get the last committed offset if the group name is not the same?


- I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by usingWatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of().

But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ?


Thanks a lot for your help,

Jean.


Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.

Re: [Questions] KafkaIO SplittableDoFn offset managment

Posted by John Casey <jo...@google.com>.
Hi Jean,

I recently resolved some bugs involved in committing offsets to incorrect
group names, and those fixes should be available in the most recent version
of beam (2.38).

For your second question, a single partition can only be consumed by a
single worker, so there is no issues with committing offsets in the correct
order.
This is because the unit of parallelism in Kafka is the partition, so
KafkaIO is currently designed to work within that space.

John

On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jw...@flowtraders.com> wrote:

> Hi,
>
>
> I am trying to use KafkaIO with the new SplittableDoFn, but I am having
> trouble understanding how offset management is supposed to work (using beam
> version 2.36.0).
>
>
> - In the comments of ReadFromKafkaDoFn it is stated that the initial range
> for KafkaSourceDescriptor startOffset can take the value of the {@code last
> committed offset + 1} for the {@link Consumer#position(TopicPartition).
>
> But when looking at the implementation of initialRestriction(), the
> consumer that tries to get the committed offset is built using:
> consumerFactoryFn.apply(
>     KafkaIOUtils.getOffsetConsumerConfig(
>         "initialOffset", offsetConsumerConfig, updatedConsumerConfig))
>
>
> and getOffsetConsumerConfig() appends a random number to the consumer
> group name. In addition to that, KafkaCommitOffset which is responsible
> for committing offsets uses a different name for the consumer groupId.
>
> How is it supposed to get the last committed offset if the group name is
> not the same?
>
>
> - I am also trying to commit the offsets manually at the end of my
> pipeline and get KafkaIO to resume from the last committed offset by using
> WatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of().
>
> But since - if I understand correctly - a single (topic,partition) can be
> split into multiple workers, how can we make sure that we always commit
> offsets in the correct order ?
>
>
> Thanks a lot for your help,
>
> Jean.
>
> Jean Wisser
> Data Engineer
>
> Flow Traders B.V.
>
> *T: *+31 20 799 6497 <+31%2020%20799%206497>
> *F:* +31 20 799 6780 <+31%2020%20799%206780>
>
> Jacob Bontiusplaats 9
> 1018 LL Amsterdam
> Nederland
> www.flowtraders.com
>
> *Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered
> office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is
> registered with the Trade Registry of the Chamber of Commerce under number
> 33.22.3268. This message may contain information that is not intended for
> you. If you are not the addressee or if this message was sent to you by
> mistake, you are requested to inform the sender and delete the message.
> This message may not be forwarded or published to any other person than its
> addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V.
> accepts no liability for damage of any kind resulting from the risks
> inherent in the electronic transmission of messages.*
>