You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/21 04:37:13 UTC

[kafka] branch 2.5 updated: KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 223bb49  KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)
223bb49 is described below

commit 223bb49623cdcdaa6c44e8c875279f88100bfa04
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Mar 20 21:34:08 2020 -0700

    KAFKA-9741: Update ConsumerGroupMetadata before calling onPartitionsRevoked() (#8325)
    
    If partitions are revoked, an application may want to commit the current offsets.
    
    Using transactions, committing offsets would be done via the producer passing in the current ConsumerGroupMetadata. If the metadata is not updates before the callback, the call to commitTransaction(...) fails as and old generationId would be used.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/ConsumerCoordinator.java     | 14 ++++++++++----
 .../consumer/internals/ConsumerCoordinatorTest.java | 21 +++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7a5f627..283e769 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -341,6 +341,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
+        // Give the assignor a chance to update internal state based on the received assignment
+        groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
+
         Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
@@ -392,11 +395,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // were not explicitly requested, so we update the joined subscription here.
         maybeUpdateJoinedSubscription(assignedPartitions);
 
-        // give the assignor a chance to update internal state based on the received assignment
-        groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
-        assignor.onAssignment(assignment, groupMetadata);
+        // Catch any exception here to make sure we could complete the user callback.
+        try {
+            assignor.onAssignment(assignment, groupMetadata);
+        } catch (Exception e) {
+            firstException.compareAndSet(null, e);
+        }
 
-        // reschedule the auto commit starting from now
+        // Reschedule the auto commit starting from now
         if (autoCommitEnabled)
             this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a9ecdad..06491eb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2399,6 +2399,27 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void shouldUpdateConsumerGroupMetadataBeforeCallbacks() {
+        final MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                assertEquals(2, coordinator.groupMetadata().generationId());
+            }
+        };
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        {
+            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+                new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
+            coordinator.onJoinComplete(1, "memberId", partitionAssignor.name(), buffer);
+        }
+
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+            new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), ByteBuffer.wrap(new byte[0])));
+        coordinator.onJoinComplete(2, "memberId", partitionAssignor.name(), buffer);
+    }
+
+    @Test
     public void testConsumerRejoinAfterRebalance() {
         try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
             coordinator.ensureActiveGroup();