You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/13 11:59:44 UTC
[kafka] branch 3.2 updated: KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 75b4d06043 KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
75b4d06043 is described below
commit 75b4d06043d1524f53ecd8a6c8321c5c5c135f52
Author: Hao Li <11...@users.noreply.github.com>
AuthorDate: Wed Apr 13 04:49:31 2022 -0700
KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
Reviewers: Bruno Cadonna <br...@confluent.io>, David Jacot <dj...@confluent.io>
---
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 6 +++---
.../apache/kafka/streams/processor/internals/StreamThreadTest.java | 5 ++---
2 files changed, 5 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7401e539c4..1685735fc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -589,7 +589,7 @@ public class StreamThread extends Thread {
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Scheduled probing rebalance");
nextProbingRebalanceMs.set(Long.MAX_VALUE);
}
} catch (final TaskCorruptedException e) {
@@ -601,7 +601,7 @@ public class StreamThread extends Thread {
final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
if (enforceRebalance && eosEnabled) {
log.info("Active task(s) got corrupted. Triggering a rebalance.");
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Active tasks corrupted");
}
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);
@@ -643,7 +643,7 @@ public class StreamThread extends Thread {
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Shutdown requested");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c0cf9a20bd..cfbf1e5531 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -514,6 +514,7 @@ public class StreamThreadTest {
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
mockClientSupplier.setCluster(createCluster());
+ mockConsumer.enforceRebalance("Scheduled probing rebalance");
EasyMock.replay(mockConsumer);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
@@ -535,8 +536,6 @@ public class StreamThreadTest {
null
);
- mockConsumer.enforceRebalance();
-
mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
thread.start();
@@ -2422,7 +2421,7 @@ public class StreamThreadTest {
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
- consumer.enforceRebalance();
+ consumer.enforceRebalance("Active tasks corrupted");
expectLastCall();
EasyMock.replay(task1, task2, taskManager, consumer);