You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/15 00:32:45 UTC

[kafka] branch trunk updated: MINOR: Handle task migrated inside corruption path (#8667)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b558287  MINOR: Handle task migrated inside corruption path (#8667)
b558287 is described below

commit b558287c0b6cfb8ac11548048ae12ca6b5c63257
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu May 14 17:31:49 2020 -0700

    MINOR: Handle task migrated inside corruption path (#8667)
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../streams/processor/internals/StreamThread.java  | 41 ++++++++------
 .../processor/internals/StreamThreadTest.java      | 64 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d4079f1..fb73a66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -555,28 +555,35 @@ public class StreamThread extends Thread {
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
                              "Will close the task as dirty and re-create and bootstrap from scratch.", e);
-
-                taskManager.commit(
-                    taskManager.tasks()
-                        .values()
-                        .stream()
-                        .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
-                        .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
-                        .collect(Collectors.toSet())
-                );
-                taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+                try {
+                    taskManager.commit(
+                        taskManager.tasks()
+                            .values()
+                            .stream()
+                            .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
+                            .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
+                            .collect(Collectors.toSet())
+                    );
+                    taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+                } catch (final TaskMigratedException taskMigrated) {
+                    handleTaskMigrated(taskMigrated);
+                }
             } catch (final TaskMigratedException e) {
-                log.warn("Detected that the thread is being fenced. " +
-                             "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
-                             "Will close out all assigned tasks and rejoin the consumer group.", e);
-
-                taskManager.handleLostAll();
-                mainConsumer.unsubscribe();
-                subscribeConsumer();
+                handleTaskMigrated(e);
             }
         }
     }
 
+    private void handleTaskMigrated(final TaskMigratedException e) {
+        log.warn("Detected that the thread is being fenced. " +
+                     "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
+                     "Will close out all assigned tasks and rejoin the consumer group.", e);
+
+        taskManager.handleLostAll();
+        mainConsumer.unsubscribe();
+        subscribeConsumer();
+    }
+
     private void subscribeConsumer() {
         if (builder.usesPatternSubscription()) {
             mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5542067..1643e03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -107,6 +107,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -1932,6 +1933,69 @@ public class StreamThreadTest {
         )).anyTimes();
         expect(taskManager.commit(singleton(task2))).andReturn(0);
 
+        taskManager.handleCorruption(singletonMap(taskId1, emptySet()));
+        expectLastCall();
+
+        EasyMock.replay(task1, task2, taskManager);
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE)
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasksWithChangelogs);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        verify(taskManager);
+    }
+
+    @Test
+    public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
+            mkEntry(taskId1, emptySet())
+        );
+
+        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
+        expect(task1.id()).andReturn(taskId1).anyTimes();
+        expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
+        expect(task2.id()).andReturn(taskId2).anyTimes();
+
+        expect(taskManager.tasks()).andReturn(mkMap(
+            mkEntry(taskId1, task1),
+            mkEntry(taskId2, task2)
+        )).anyTimes();
+        expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated",
+            new RuntimeException("non-corrupted task migrated")));
+
+        taskManager.handleLostAll();
+        expectLastCall();
+
         EasyMock.replay(task1, task2, taskManager);
 
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);