You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/01 02:44:50 UTC
[kafka] branch trunk updated: KAFKA-9525: add enforceRebalance
method to Consumer API (#8087)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1f2ece KAFKA-9525: add enforceRebalance method to Consumer API (#8087)
a1f2ece is described below
commit a1f2ece323e59a751a55386bf06beb5724c56545
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Feb 29 21:44:22 2020 -0500
KAFKA-9525: add enforceRebalance method to Consumer API (#8087)
As described in KIP-568.
Waiting on acceptance of the KIP to write the tests, on the off chance something changes. But rest assured unit tests are coming ⚡️
Will also kick off existing Streams system tests which leverage this new API (eg version probing, sometimes broker bounce)
Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../apache/kafka/clients/consumer/Consumer.java | 5 +++
.../kafka/clients/consumer/KafkaConsumer.java | 33 +++++++++++++++
.../kafka/clients/consumer/MockConsumer.java | 4 ++
.../consumer/internals/AbstractCoordinator.java | 4 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 43 ++++++++++++++++++-
.../internals/ConsumerCoordinatorTest.java | 28 -------------
.../consumer/internals/MockRebalanceListener.java | 48 ++++++++++++++++++++++
.../streams/processor/internals/StreamThread.java | 11 ++---
.../tests/streams/streams_upgrade_test.py | 2 +-
9 files changed, 138 insertions(+), 40 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b589fb4..05ac868 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -249,6 +249,11 @@ public interface Consumer<K, V> extends Closeable {
ConsumerGroupMetadata groupMetadata();
/**
+ * @see KafkaConsumer#enforceRebalance)
+ */
+ void enforceRebalance();
+
+ /**
* @see KafkaConsumer#close()
*/
void close();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f140d6a..0680e67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2248,6 +2248,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
+ * Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces
+ * the consumer to trigger a new rebalance on the next {@link #poll(Duration)} call. Note that this API does not
+ * itself initiate the rebalance, so you must still call {@link #poll(Duration)}. If a rebalance is already in
+ * progress this call will be a no-op. If you wish to force an additional rebalance you must complete the current
+ * one by calling poll before retrying this API.
+ * <p>
+ * You do not need to call this during normal processing, as the consumer group will manage itself
+ * automatically and rebalance when necessary. However there may be situations where the application wishes to
+ * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to
+ * the Consumer and its group changes in a way that would affect the userdata encoded in the
+ * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer
+ * will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that
+ * the assignor can perform a partition reassignment based on the latest userdata. If your assignor does not use
+ * this userdata, or you do not use a custom
+ * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not
+ * use this API.
+ *
+ * @throws java.lang.IllegalStateException if the consumer does not use group subscription
+ */
+ @Override
+ public void enforceRebalance() {
+ acquireAndEnsureOpen();
+ try {
+ if (coordinator == null) {
+ throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
+ }
+ coordinator.requestRejoin();
+ } finally {
+ release();
+ }
+ }
+
+ /**
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
* If auto-commit is enabled, this will commit the current offsets if possible within the default
* timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 67b4e9f..b8579c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -567,6 +567,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public void enforceRebalance() {
+ }
+
+ @Override
public void close(Duration timeout) {
close();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a007017..a1ccb61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -426,7 +426,7 @@ public abstract class AbstractCoordinator implements Closeable {
// Generation data maybe concurrently cleared by Heartbeat thread.
// Can't use synchronized for {@code onJoinComplete}, because it can be long enough
- // and shouldn't block hearbeat thread.
+ // and shouldn't block heartbeat thread.
// See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
@@ -904,7 +904,7 @@ public abstract class AbstractCoordinator implements Closeable {
resetGeneration();
}
- protected synchronized void requestRejoin() {
+ public synchronized void requestRejoin() {
this.rejoinNeeded = true;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 00d4db4..3e23883 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -1722,7 +1723,7 @@ public class KafkaConsumerTest {
}
@Test(expected = AuthenticationException.class)
- public void testCommittedAuthenticationFaiure() {
+ public void testCommittedAuthenticationFailure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
consumer.committed(Collections.singleton(tp0)).get(tp0);
}
@@ -2418,4 +2419,44 @@ public class KafkaConsumerTest {
assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testEnforceRebalanceWithManualAssignment() {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
+ consumer.assign(singleton(new TopicPartition("topic", 0)));
+ consumer.enforceRebalance();
+ }
+ }
+
+ @Test
+ public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
+ Time time = new MockTime(1L);
+ SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
+ initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+
+ consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener);
+ Node node = metadata.fetch().nodes().get(0);
+ prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
+
+ // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
+ consumer.poll(Duration.ZERO);
+ consumer.poll(Duration.ZERO);
+
+ // onPartitionsRevoked is not invoked when first joining the group
+ assertEquals(countingRebalanceListener.revokedCount, 0);
+ assertEquals(countingRebalanceListener.assignedCount, 1);
+
+ consumer.enforceRebalance();
+
+ // the next poll should trigger a rebalance
+ consumer.poll(Duration.ZERO);
+
+ assertEquals(countingRebalanceListener.revokedCount, 1);
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fa2d797..c2edc08 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -2902,31 +2901,4 @@ public class ConsumerCoordinatorTest {
this.exception = exception;
}
}
-
- private static class MockRebalanceListener implements ConsumerRebalanceListener {
- public Collection<TopicPartition> lost;
- public Collection<TopicPartition> revoked;
- public Collection<TopicPartition> assigned;
- public int lostCount = 0;
- public int revokedCount = 0;
- public int assignedCount = 0;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- this.assigned = partitions;
- assignedCount++;
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- this.revoked = partitions;
- revokedCount++;
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions) {
- this.lost = partitions;
- lostCount++;
- }
- }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java
new file mode 100644
index 0000000..be80254
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+public class MockRebalanceListener implements ConsumerRebalanceListener {
+ public Collection<TopicPartition> lost;
+ public Collection<TopicPartition> revoked;
+ public Collection<TopicPartition> assigned;
+ public int lostCount = 0;
+ public int revokedCount = 0;
+ public int assignedCount = 0;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ this.assigned = partitions;
+ assignedCount++;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ this.revoked = partitions;
+ revokedCount++;
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions) {
+ this.lost = partitions;
+ lostCount++;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1ec6e7b..4092825 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -753,7 +753,7 @@ public class StreamThread extends Thread {
log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
assignmentErrorCode.set(AssignorError.NONE.code());
- enforceRebalance();
+ mainConsumer.enforceRebalance();
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks {} are corrupted. " +
@@ -766,16 +766,11 @@ public class StreamThread extends Thread {
"Will close out all assigned tasks and rejoin the consumer group.");
taskManager.handleLostAll();
- enforceRebalance();
+ mainConsumer.enforceRebalance();
}
}
}
-
- private void enforceRebalance() {
- mainConsumer.unsubscribe();
- subscribeConsumer();
- }
-
+
private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index dada19e..505ab82 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -539,7 +539,7 @@ class StreamsUpgradeTest(Test):
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 7' on" + str(second_other_node.account))
- log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
+ log_monitor.wait_until("Version probing detected. Rejoining the consumer group to trigger a new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))