You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/01 11:00:12 UTC
[kafka] branch trunk updated: KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cc1e776f7 KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)
1cc1e776f7 is described below
commit 1cc1e776f703b180f4bd979e8a551805b3bdc94e
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon Aug 1 12:59:41 2022 +0200
KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)
We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.
Reviewers: Chris Egerton <fe...@gmail.com>
---
.../internals/AlterConsumerGroupOffsetsHandler.java | 2 ++
.../kafka/connect/mirror/MirrorCheckpointTask.java | 17 ++++++++++++++---
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index eab2e2bb73..425ed66bd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
+ // Member level errors.
+ case UNKNOWN_MEMBER_ID:
log.debug("OffsetCommit request for group id {} failed due to error {}.",
groupId.idValue, error);
partitionResults.put(topicPartition, error);
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812e..3e6247334b 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -17,9 +17,11 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
@@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask {
void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
- targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
- log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
- consumerGroupId, offsetToSync.size());
+ AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+ result.all().whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof UnknownMemberIdException) {
+ log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", consumerGroupId);
+ } else {
+ log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable);
+ }
+ } else {
+ log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId);
+ }
+ });
}
}