You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/01 02:44:50 UTC

[kafka] branch trunk updated: KAFKA-9525: add enforceRebalance method to Consumer API (#8087)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1f2ece  KAFKA-9525: add enforceRebalance method to Consumer API (#8087)
a1f2ece is described below

commit a1f2ece323e59a751a55386bf06beb5724c56545
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Feb 29 21:44:22 2020 -0500

    KAFKA-9525: add enforceRebalance method to Consumer API (#8087)
    
    As described in KIP-568.
    
    Waiting on acceptance of the KIP to write the tests, on the off chance something changes. But rest assured unit tests are coming ⚡️
    
    Will also kick off existing Streams system tests which leverage this new API (eg version probing, sometimes broker bounce)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/clients/consumer/Consumer.java    |  5 +++
 .../kafka/clients/consumer/KafkaConsumer.java      | 33 +++++++++++++++
 .../kafka/clients/consumer/MockConsumer.java       |  4 ++
 .../consumer/internals/AbstractCoordinator.java    |  4 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 43 ++++++++++++++++++-
 .../internals/ConsumerCoordinatorTest.java         | 28 -------------
 .../consumer/internals/MockRebalanceListener.java  | 48 ++++++++++++++++++++++
 .../streams/processor/internals/StreamThread.java  | 11 ++---
 .../tests/streams/streams_upgrade_test.py          |  2 +-
 9 files changed, 138 insertions(+), 40 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b589fb4..05ac868 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -249,6 +249,11 @@ public interface Consumer<K, V> extends Closeable {
     ConsumerGroupMetadata groupMetadata();
 
     /**
+     * @see KafkaConsumer#enforceRebalance)
+     */
+    void enforceRebalance();
+
+    /**
      * @see KafkaConsumer#close()
      */
     void close();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f140d6a..0680e67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2248,6 +2248,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
+     * Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces
+     * the consumer to trigger a new rebalance on the next {@link #poll(Duration)} call. Note that this API does not
+     * itself initiate the rebalance, so you must still call {@link #poll(Duration)}. If a rebalance is already in
+     * progress this call will be a no-op. If you wish to force an additional rebalance you must complete the current
+     * one by calling poll before retrying this API.
+     * <p>
+     * You do not need to call this during normal processing, as the consumer group will manage itself
+     * automatically and rebalance when necessary. However there may be situations where the application wishes to
+     * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to
+     * the Consumer and its group changes in a way that would affect the userdata encoded in the
+     * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer
+     * will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that
+     * the assignor can perform a partition reassignment based on the latest userdata. If your assignor does not use
+     * this userdata, or you do not use a custom
+     * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not
+     * use this API.
+     *
+     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
+     */
+    @Override
+    public void enforceRebalance() {
+        acquireAndEnsureOpen();
+        try {
+            if (coordinator == null) {
+                throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
+            }
+            coordinator.requestRejoin();
+        } finally {
+            release();
+        }
+    }
+
+    /**
      * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
      * If auto-commit is enabled, this will commit the current offsets if possible within the default
      * timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 67b4e9f..b8579c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -567,6 +567,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public void enforceRebalance() {
+    }
+
+    @Override
     public void close(Duration timeout) {
         close();
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a007017..a1ccb61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -426,7 +426,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
                 // Generation data maybe concurrently cleared by Heartbeat thread.
                 // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
-                // and  shouldn't block hearbeat thread.
+                // and  shouldn't block heartbeat thread.
                 // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
                 synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
@@ -904,7 +904,7 @@ public abstract class AbstractCoordinator implements Closeable {
         resetGeneration();
     }
 
-    protected synchronized void requestRejoin() {
+    public synchronized void requestRejoin() {
         this.rejoinNeeded = true;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 00d4db4..3e23883 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -1722,7 +1723,7 @@ public class KafkaConsumerTest {
     }
 
     @Test(expected = AuthenticationException.class)
-    public void testCommittedAuthenticationFaiure() {
+    public void testCommittedAuthenticationFailure() {
         final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
         consumer.committed(Collections.singleton(tp0)).get(tp0);
     }
@@ -2418,4 +2419,44 @@ public class KafkaConsumerTest {
         assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
         assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void testEnforceRebalanceWithManualAssignment() {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
+            consumer.assign(singleton(new TopicPartition("topic", 0)));
+            consumer.enforceRebalance();
+        }
+    }
+
+    @Test
+    public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
+        Time time = new MockTime(1L);
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
+        initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+
+        consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener);
+        Node node = metadata.fetch().nodes().get(0);
+        prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
+
+        // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
+        consumer.poll(Duration.ZERO);
+        consumer.poll(Duration.ZERO);
+
+        // onPartitionsRevoked is not invoked when first joining the group
+        assertEquals(countingRebalanceListener.revokedCount, 0);
+        assertEquals(countingRebalanceListener.assignedCount, 1);
+
+        consumer.enforceRebalance();
+
+        // the next poll should trigger a rebalance
+        consumer.poll(Duration.ZERO);
+
+        assertEquals(countingRebalanceListener.revokedCount, 1);
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fa2d797..c2edc08 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -2902,31 +2901,4 @@ public class ConsumerCoordinatorTest {
             this.exception = exception;
         }
     }
-
-    private static class MockRebalanceListener implements ConsumerRebalanceListener {
-        public Collection<TopicPartition> lost;
-        public Collection<TopicPartition> revoked;
-        public Collection<TopicPartition> assigned;
-        public int lostCount = 0;
-        public int revokedCount = 0;
-        public int assignedCount = 0;
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            this.assigned = partitions;
-            assignedCount++;
-        }
-
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            this.revoked = partitions;
-            revokedCount++;
-        }
-
-        @Override
-        public void onPartitionsLost(Collection<TopicPartition> partitions) {
-            this.lost = partitions;
-            lostCount++;
-        }
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java
new file mode 100644
index 0000000..be80254
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+public class MockRebalanceListener implements ConsumerRebalanceListener {
+    public Collection<TopicPartition> lost;
+    public Collection<TopicPartition> revoked;
+    public Collection<TopicPartition> assigned;
+    public int lostCount = 0;
+    public int revokedCount = 0;
+    public int assignedCount = 0;
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        this.assigned = partitions;
+        assignedCount++;
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        this.revoked = partitions;
+        revokedCount++;
+    }
+
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        this.lost = partitions;
+        lostCount++;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1ec6e7b..4092825 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -753,7 +753,7 @@ public class StreamThread extends Thread {
                     log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
 
                     assignmentErrorCode.set(AssignorError.NONE.code());
-                    enforceRebalance();
+                    mainConsumer.enforceRebalance();
                 }
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks {} are corrupted. " +
@@ -766,16 +766,11 @@ public class StreamThread extends Thread {
                     "Will close out all assigned tasks and rejoin the consumer group.");
 
                 taskManager.handleLostAll();
-                enforceRebalance();
+                mainConsumer.enforceRebalance();
             }
         }
     }
-
-    private void enforceRebalance() {
-        mainConsumer.unsubscribe();
-        subscribeConsumer();
-    }
-
+    
     private void subscribeConsumer() {
         if (builder.usesPatternSubscription()) {
             mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index dada19e..505ab82 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -539,7 +539,7 @@ class StreamsUpgradeTest(Test):
                                                         timeout_sec=60,
                                                         err_msg="Never saw output 'Upgrade metadata to version 7' on" + str(second_other_node.account))
 
-                    log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
+                    log_monitor.wait_until("Version probing detected. Rejoining the consumer group to trigger a new rebalance.",
                                            timeout_sec=60,
                                            err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))