You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/09/08 22:05:51 UTC
[kafka] branch trunk updated: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b7f20be809 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
b7f20be809 is described below
commit b7f20be809c47202ace4b54e69be5428012df1a9
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Sep 9 00:05:40 2022 +0200
KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error.
This PR fixes the issue on the client side by not setting the group instance id if the member id is empty (no generation).
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/ConsumerCoordinator.java | 5 ++++-
.../internals/ConsumerCoordinatorTest.java | 26 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7dc8f..5228c60e0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1272,8 +1272,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
final Generation generation;
+ final String groupInstanceId;
if (subscriptions.hasAutoAssignedPartitions()) {
generation = generationIfStable();
+ groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
@@ -1293,6 +1295,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
} else {
generation = Generation.NO_GENERATION;
+ groupInstanceId = null;
}
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
@@ -1300,7 +1303,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setMemberId(generation.memberId)
- .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
+ .setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 829fd2dbf0..06584aef46 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2820,6 +2820,32 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(newGen, coordinator.generation());
}
+ @Test
+ public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
+ rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(
+ rebalanceConfig,
+ new Metrics(),
+ assignors,
+ false,
+ subscriptions
+ );
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(5000));
+
+ client.prepareResponse(body -> {
+ OffsetCommitRequestData data = ((OffsetCommitRequest) body).data();
+ return data.groupInstanceId() == null && data.memberId().isEmpty();
+ }, offsetCommitResponse(Collections.emptyMap()));
+
+ RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+ new OffsetAndMetadata(100L, "metadata")));
+
+ assertTrue(consumerClient.poll(future, time.timer(5000)));
+ assertFalse(future.failed());
+ }
+
@Test
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed