You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/07 00:00:44 UTC

[kafka] branch trunk updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 588e8a5  KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
588e8a5 is described below

commit 588e8a5be83037225716a78358d0203a9e070168
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Apr 7 01:00:11 2020 +0100

    KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
    
    On metadata change for assigned topics, we trigger rebalance, revoke partitions and send JoinGroup. If metadata reverts to the original value and JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata change to ensure that we retry on failure.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  5 +-
 .../internals/ConsumerCoordinatorTest.java         | 58 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b8d44b4..ab49837 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -747,11 +747,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // we need to rejoin if we performed the assignment and metadata has changed;
         // also for those owned-but-no-longer-existed partitions we should drop them as lost
-        if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
+        if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
+            requestRejoin();
             return true;
+        }
 
         // we need to join if our subscription has changed since the last join
         if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
+            requestRejoin();
             return true;
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9a189a8..aaf2962 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -974,6 +974,64 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    /**
+     * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails
+     * and metadata reverts to its original value, the consumer should still retry JoinGroup.
+     */
+    @Test
+    public void testRebalanceWithMetadataChange() {
+        final String consumerId = "leader";
+        final List<String> topics = Arrays.asList(topic1, topic2);
+        final List<TopicPartition> partitions = Arrays.asList(t1p, t2p);
+        subscriptions.subscribe(toSet(topics), rebalanceListener);
+        client.updateMetadata(TestUtils.metadataUpdateWith(1,
+                Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
+        coordinator.maybeUpdateSubscriptionMetadata();
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<String, List<String>> initialSubscription = singletonMap(consumerId, topics);
+        partitionAssignor.prepare(singletonMap(consumerId, partitions));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        // rejoin will only be set in the next poll call
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(toSet(topics), subscriptions.subscription());
+        assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+
+        // Change metadata to trigger rebalance.
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
+        coordinator.poll(time.timer(0));
+
+        // Revert metadata to original value. Fail pending JoinGroup. Another
+        // JoinGroup should be sent, which will be completed successfully.
+        client.updateMetadata(TestUtils.metadataUpdateWith(1,
+                Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
+        client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        client.respond(joinGroupLeaderResponse(2, consumerId, initialSubscription, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        Collection<TopicPartition> revoked = getRevoked(partitions, partitions);
+        assertEquals(revoked.isEmpty() ? 0 : 1, rebalanceListener.revokedCount);
+        assertEquals(revoked.isEmpty() ? null : revoked, rebalanceListener.revoked);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertEquals(getAdded(partitions, partitions), rebalanceListener.assigned);
+        assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+    }
+
     @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";