You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/21 04:34:33 UTC

[kafka] branch trunk updated: KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 635b5fd  KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)
635b5fd is described below

commit 635b5fd47c86809401dd8f3c0e9a225526f96555
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Mar 20 21:34:08 2020 -0700

    KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)
    
    If partitions are revoked, an application may want to commit the current offsets.
    
    Using transactions, committing offsets would be done via the producer passing in the current ConsumerGroupMetadata. If the metadata is not updates before the callback, the call to commitTransaction(...) fails as and old generationId would be used.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/ConsumerCoordinator.java     |  6 +++---
 .../consumer/internals/ConsumerCoordinatorTest.java | 21 +++++++++++++++++++++
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b65a741..b8d44b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -344,6 +344,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
+        // Give the assignor a chance to update internal state based on the received assignment
+        groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
+
         Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
@@ -395,9 +398,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // were not explicitly requested, so we update the joined subscription here.
         maybeUpdateJoinedSubscription(assignedPartitions);
 
-        // Give the assignor a chance to update internal state based on the received assignment
-        groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
-
         // Catch any exception here to make sure we could complete the user callback.
         try {
             assignor.onAssignment(assignment, groupMetadata);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0a6fe1a..9a189a8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2500,6 +2500,27 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void shouldUpdateConsumerGroupMetadataBeforeCallbacks() {
+        final MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                assertEquals(2, coordinator.groupMetadata().generationId());
+            }
+        };
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        {
+            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+                new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
+            coordinator.onJoinComplete(1, "memberId", partitionAssignor.name(), buffer);
+        }
+
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+            new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), ByteBuffer.wrap(new byte[0])));
+        coordinator.onJoinComplete(2, "memberId", partitionAssignor.name(), buffer);
+    }
+
+    @Test
     public void testConsumerRejoinAfterRebalance() {
         try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
             coordinator.ensureActiveGroup();