You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/01 11:00:12 UTC

[kafka] branch trunk updated: KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1cc1e776f7 KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)
1cc1e776f7 is described below

commit 1cc1e776f703b180f4bd979e8a551805b3bdc94e
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon Aug 1 12:59:41 2022 +0200

    KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)
    
    We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.
    
    Reviewers: Chris Egerton <fe...@gmail.com>
---
 .../internals/AlterConsumerGroupOffsetsHandler.java     |  2 ++
 .../kafka/connect/mirror/MirrorCheckpointTask.java      | 17 ++++++++++++++---
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index eab2e2bb73..425ed66bd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co
             case INVALID_GROUP_ID:
             case INVALID_COMMIT_OFFSET_SIZE:
             case GROUP_AUTHORIZATION_FAILED:
+            // Member level errors.
+            case UNKNOWN_MEMBER_ID:
                 log.debug("OffsetCommit request for group id {} failed due to error {}.",
                     groupId.idValue, error);
                 partitionResults.put(topicPartition, error);
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812e..3e6247334b 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.data.Schema;
@@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask {
 
     void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) {
         if (targetAdminClient != null) {
-            targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
-            log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
-                      consumerGroupId, offsetToSync.size());
+            AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+            result.all().whenComplete((v, throwable) -> {
+                if (throwable != null) {
+                    if (throwable.getCause() instanceof UnknownMemberIdException) {
+                        log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", consumerGroupId);
+                    } else {
+                        log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable);
+                    }
+                } else {
+                    log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId);
+                }
+            });
         }
     }