You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/09/28 23:51:41 UTC

[kafka] branch trunk updated: KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw… (#11076)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e57300  KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw… (#11076)
7e57300 is described below

commit 7e573001484427dc73d821cc232a4c3bb3b5f5bb
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Wed Sep 29 05:20:16 2021 +0530

    KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw… (#11076)
    
    This PR aims to utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted tasks. The idea is that, when we hit TaskCorruptedException on an active task, a rebalance is triggered after we've wiped out the corrupted state stores. This will allow the assignor to temporarily redirect this task to another client who can resume work on the task while the original owner works on restoring the state from scratch.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../streams/processor/internals/StreamThread.java  |  10 +-
 .../streams/processor/internals/TaskManager.java   |   3 +-
 .../processor/internals/StreamThreadTest.java      | 140 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a1da373..addfe15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -310,6 +310,7 @@ public class StreamThread extends Thread {
     // These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+    private final boolean eosEnabled;
 
     public static StreamThread create(final TopologyMetadata topologyMetadata,
                                       final StreamsConfig config,
@@ -547,6 +548,7 @@ public class StreamThread extends Thread {
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
         this.numIterations = 1;
+        this.eosEnabled = eosEnabled(config);
     }
 
     private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -609,7 +611,13 @@ public class StreamThread extends Thread {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " +
                          "Will close the task as dirty and re-create and bootstrap from scratch.", e);
                 try {
-                    taskManager.handleCorruption(e.corruptedTasks());
+                    // check if any active task got corrupted. We will trigger a rebalance in that case.
+                    // once the task corruptions have been handled
+                    final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
+                    if (enforceRebalance && eosEnabled) {
+                        log.info("Active task(s) got corrupted. Triggering a rebalance.");
+                        mainConsumer.enforceRebalance();
+                    }
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9269c9d..67113a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -170,7 +170,7 @@ public class TaskManager {
     /**
      * @throws TaskMigratedException
      */
-    void handleCorruption(final Set<TaskId> corruptedTasks) {
+    boolean handleCorruption(final Set<TaskId> corruptedTasks) {
         final Set<Task> corruptedActiveTasks = new HashSet<>();
         final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
@@ -210,6 +210,7 @@ public class TaskManager {
         }
 
         closeDirtyAndRevive(corruptedActiveTasks, true);
+        return !corruptedActiveTasks.isEmpty();
     }
 
     private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e57c565..6337a13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -148,6 +148,7 @@ public class StreamThreadTest {
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private final StreamsConfig config = new StreamsConfig(configProps(false));
+    private final StreamsConfig eosEnabledConfig = new StreamsConfig(configProps(true));
     private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
     private final ChangelogReader changelogReader = new MockChangelogReader();
     private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true, false);
@@ -2287,7 +2288,7 @@ public class StreamThreadTest {
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        taskManager.handleCorruption(corruptedTasks);
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
 
         EasyMock.replay(task1, task2, taskManager, consumer);
 
@@ -2468,6 +2469,143 @@ public class StreamThreadTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() {
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        consumer.subscribe((Collection<String>) anyObject(), anyObject());
+        EasyMock.expectLastCall().anyTimes();
+        consumer.unsubscribe();
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.replay(consumerGroupMetadata);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
+        expect(task1.id()).andReturn(taskId1).anyTimes();
+        expect(task2.state()).andReturn(Task.State.CREATED).anyTimes();
+        expect(task2.id()).andReturn(taskId2).anyTimes();
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
+
+        consumer.enforceRebalance();
+        expectLastCall();
+
+        EasyMock.replay(task1, task2, taskManager, consumer);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+        final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            eosEnabledConfig,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            topologyMetadata,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasks);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        verify(taskManager);
+        verify(consumer);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        consumer.subscribe((Collection<String>) anyObject(), anyObject());
+        EasyMock.expectLastCall().anyTimes();
+        consumer.unsubscribe();
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.replay(consumerGroupMetadata);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+        expect(task1.state()).andReturn(Task.State.CLOSED).anyTimes();
+        expect(task1.id()).andReturn(taskId1).anyTimes();
+        expect(task2.state()).andReturn(Task.State.CLOSED).anyTimes();
+        expect(task2.id()).andReturn(taskId2).anyTimes();
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(false);
+
+        EasyMock.replay(task1, task2, taskManager, consumer);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+        final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            eosEnabledConfig,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            topologyMetadata,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasks);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        verify(taskManager);
+        verify(consumer);
+    }
+
+    @Test
     public void shouldNotCommitNonRunningNonRestoringTasks() {
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);