You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Daniel Urban (Jira)" <ji...@apache.org> on 2023/04/19 13:11:00 UTC

[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

    [ https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714097#comment-17714097 ] 

Daniel Urban commented on KAFKA-14807:
--------------------------------------

[~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a Connect cluster?

If the latter, you can fix this by only passing source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to the MirrorCheckpointConnector.

For MM2 dedicated mode, I'm not aware of any workarounds.

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14807
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.4.0, 3.3.1, 3.3.2
>         Environment: centos7
>            Reporter: Zhaoli
>            Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped replicating.
> The common characteristic of these consumer groups is their EMPTY status, which means they have no active members at that moment. All the active consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration above also affects the consumption of topic `mm2-offset-syncs`, therefore the map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
>     Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
>     if (offsetSync.isPresent()) {
>         if (offsetSync.get().upstreamOffset() > upstreamOffset) {
>             // Offset is too far in the past to translate accurately
>             return OptionalLong.of(-1L);
>         }
>         long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
>         return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
>     } else {
>         return OptionalLong.empty();
>     }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)