You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/13 11:59:44 UTC

[kafka] branch 3.2 updated: KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 75b4d06043 KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
75b4d06043 is described below

commit 75b4d06043d1524f53ecd8a6c8321c5c5c135f52
Author: Hao Li <11...@users.noreply.github.com>
AuthorDate: Wed Apr 13 04:49:31 2022 -0700

    KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../org/apache/kafka/streams/processor/internals/StreamThread.java  | 6 +++---
 .../apache/kafka/streams/processor/internals/StreamThreadTest.java  | 5 ++---
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7401e539c4..1685735fc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -589,7 +589,7 @@ public class StreamThread extends Thread {
                 runOnce();
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
-                    mainConsumer.enforceRebalance();
+                    mainConsumer.enforceRebalance("Scheduled probing rebalance");
                     nextProbingRebalanceMs.set(Long.MAX_VALUE);
                 }
             } catch (final TaskCorruptedException e) {
@@ -601,7 +601,7 @@ public class StreamThread extends Thread {
                     final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
                     if (enforceRebalance && eosEnabled) {
                         log.info("Active task(s) got corrupted. Triggering a rebalance.");
-                        mainConsumer.enforceRebalance();
+                        mainConsumer.enforceRebalance("Active tasks corrupted");
                     }
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
@@ -643,7 +643,7 @@ public class StreamThread extends Thread {
         if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
             log.warn("Detected that shutdown was requested. " +
                     "All clients in this app will now begin to shutdown");
-            mainConsumer.enforceRebalance();
+            mainConsumer.enforceRebalance("Shutdown requested");
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c0cf9a20bd..cfbf1e5531 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -514,6 +514,7 @@ public class StreamThreadTest {
         final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
 
         mockClientSupplier.setCluster(createCluster());
+        mockConsumer.enforceRebalance("Scheduled probing rebalance");
         EasyMock.replay(mockConsumer);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
@@ -535,8 +536,6 @@ public class StreamThreadTest {
             null
         );
 
-        mockConsumer.enforceRebalance();
-
         mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
 
         thread.start();
@@ -2422,7 +2421,7 @@ public class StreamThreadTest {
         expect(task2.id()).andReturn(taskId2).anyTimes();
         expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
 
-        consumer.enforceRebalance();
+        consumer.enforceRebalance("Active tasks corrupted");
         expectLastCall();
 
         EasyMock.replay(task1, task2, taskManager, consumer);