You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Alisdair Sullivan <al...@unity3d.com> on 2021/07/21 03:27:53 UTC

KafkaIO SDF reader not respecting consumer group persisted offsets

With the KafkaUnboundedReader when a consumer is configured with a consumer group id a call is made to get the current position of the consumer group for each topic/partition pair. This offset is used as the initial offset for the read transform. This allows pipelines that persist offsets (either via auto commit configured via the consumer or via the commit offsets in finalize option) to later resume at the same point.

With the SDF implementation (ReadSourceDescriptors/ReadFromKafkaDoFn) of the Kafka reader no equivalent call is made to retrieve offsets. The SDF implementation uses instead the offset consumer to retrieve the initial offset. Because the offset consumer is configured to never commit offsets this will always be one of earliest, latest or an offset calculated from a specific time. 

Users can provide to ReadFromKafkaDoFn a KafkaSourceDescriptor with an initial offset retrieved explicitly but on a failure of the processElement transform there is no way to resume from the last persisted offset. Instead I believe the original offset will be used. In long lived pipelines this will result in data duplication.

Is this an intended change, or should I open an issue marking this as a regression in functionality? Thanks.


Re: KafkaIO SDF reader not respecting consumer group persisted offsets

Posted by Boyuan Zhang <bo...@gmail.com>.
Hi Alisdair,

There are several ways to configure SDF implementation
(ReadSourceDescriptors/ReadFromKafkaDoFn)  to commit the offset:

   - Set `enable.auto.commit` in your consumer config, or
   - Configure your KafkaIO with commitOffsetsInFinalize
   <https://github.com/apache/beam/blob/6fcd3391145d5554a87fdb59a72784a26fe8839a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1128-L1137>.
   It will expand the Kafka read with a Kafka commit transform
   <https://github.com/apache/beam/blob/6fcd3391145d5554a87fdb59a72784a26fe8839a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L111-L131>,
   which commits offset every 5 mins.

I believe this behavior is the same as KafkaUnboundedReader. That means,
even with KafkaUnboundedReader, you need to provide these commit
configurations to your Kafka read to let it commit the offset when it's
time to do so.

On Tue, Jul 20, 2021 at 9:33 PM Alisdair Sullivan <
alisdair.sullivan@unity3d.com> wrote:

> With the KafkaUnboundedReader when a consumer is configured with a
> consumer group id a call is made to get the current position of the
> consumer group for each topic/partition pair. This offset is used as the
> initial offset for the read transform. This allows pipelines that persist
> offsets (either via auto commit configured via the consumer or via the
> commit offsets in finalize option) to later resume at the same point.
>
> With the SDF implementation (ReadSourceDescriptors/ReadFromKafkaDoFn) of
> the Kafka reader no equivalent call is made to retrieve offsets. The SDF
> implementation uses instead the offset consumer to retrieve the initial
> offset. Because the offset consumer is configured to never commit offsets
> this will always be one of earliest, latest or an offset calculated from a
> specific time.
>
> Users can provide to ReadFromKafkaDoFn a KafkaSourceDescriptor with an
> initial offset retrieved explicitly but on a failure of the processElement
> transform there is no way to resume from the last persisted offset. Instead
> I believe the original offset will be used. In long lived pipelines this
> will result in data duplication.
>
> Is this an intended change, or should I open an issue marking this as a
> regression in functionality? Thanks.
>
>