You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2023/02/23 13:20:00 UTC

[jira] [Updated] (KAFKA-12558) MM2 may not sync partition offsets correctly

     [ https://issues.apache.org/jira/browse/KAFKA-12558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Egerton updated KAFKA-12558:
----------------------------------
    Fix Version/s: 3.4.1

> MM2 may not sync partition offsets correctly
> --------------------------------------------
>
>                 Key: KAFKA-12558
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12558
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0, 2.6.1
>            Reporter: Alan Ning
>            Priority: Major
>             Fix For: 3.5.0, 3.4.1
>
>
> There is a race condition in {{MirrorSourceTask}} where certain partition offsets may never be sent. The bug occurs when the [outstandingOffsetSync semaphore is full|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207]. In this case, the sendOffsetSync [will silently fail|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
> This failure is normally acceptable since offset sync will retry frequently. However, {{maybeSyncOffsets}} has a bug where it will [mutate the partition state|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L199] prior to confirming the result of {{sendOffsetSync}}. The end result is that the partition state is mutated prematurely, and prevent future offset syncs to recover.
> Since {{MAX_OUTSTANDING_OFFSET_SYNCS}} is 10, this bug happens when you assign more than 10 partitions to each task.
> In my test cases where I had over 100 partitions per task, the majority of the offsets were wrong. Here's an example of such a failure. https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308
> During my troubleshooting, I customized the {{MirrorSourceTask}} to confirm that all partitions that have the wrong offset were failing to acquire the initial semaphore. The condition [can be trapped here|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208].
> *Possible Fix:*
> A possible fix is to create a {{shouldUpdate}} method in {{PartitionState}}. This method should be read-only and return true if {{sendOffsetSync}} is needed. Once {{sendOffsetSync}} is successful, only then {{update}} should be called.
> Here's some pseudocode
> {code:java}
> private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
>         long downstreamOffset) {
>     PartitionState partitionState =
>         partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
>     if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) {
>         if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
>             partitionState.update(upstreamOffset, downstreamOffset)
>         }
>     }
> }
> {code}
>  
> *Workaround:*
> For those who are experiencing this issue, the workaround is to make sure you have less than or equal to 10 partitions per task. Set your `tasks.max` value accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)