You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/15 00:32:45 UTC
[kafka] branch trunk updated: MINOR: Handle task migrated inside
corruption path (#8667)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b558287 MINOR: Handle task migrated inside corruption path (#8667)
b558287 is described below
commit b558287c0b6cfb8ac11548048ae12ca6b5c63257
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu May 14 17:31:49 2020 -0700
MINOR: Handle task migrated inside corruption path (#8667)
Reviewers: John Roesler <vv...@apache.org>
---
.../streams/processor/internals/StreamThread.java | 41 ++++++++------
.../processor/internals/StreamThreadTest.java | 64 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d4079f1..fb73a66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -555,28 +555,35 @@ public class StreamThread extends Thread {
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);
-
- taskManager.commit(
- taskManager.tasks()
- .values()
- .stream()
- .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
- .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
- .collect(Collectors.toSet())
- );
- taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+ try {
+ taskManager.commit(
+ taskManager.tasks()
+ .values()
+ .stream()
+ .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
+ .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
+ .collect(Collectors.toSet())
+ );
+ taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+ } catch (final TaskMigratedException taskMigrated) {
+ handleTaskMigrated(taskMigrated);
+ }
} catch (final TaskMigratedException e) {
- log.warn("Detected that the thread is being fenced. " +
- "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
- "Will close out all assigned tasks and rejoin the consumer group.", e);
-
- taskManager.handleLostAll();
- mainConsumer.unsubscribe();
- subscribeConsumer();
+ handleTaskMigrated(e);
}
}
}
+ private void handleTaskMigrated(final TaskMigratedException e) {
+ log.warn("Detected that the thread is being fenced. " +
+ "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
+ "Will close out all assigned tasks and rejoin the consumer group.", e);
+
+ taskManager.handleLostAll();
+ mainConsumer.unsubscribe();
+ subscribeConsumer();
+ }
+
private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5542067..1643e03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -107,6 +107,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -1932,6 +1933,69 @@ public class StreamThreadTest {
)).anyTimes();
expect(taskManager.commit(singleton(task2))).andReturn(0);
+ taskManager.handleCorruption(singletonMap(taskId1, emptySet()));
+ expectLastCall();
+
+ EasyMock.replay(task1, task2, taskManager);
+
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
+ final StreamThread thread = new StreamThread(
+ mockTime,
+ config,
+ null,
+ consumer,
+ consumer,
+ null,
+ null,
+ taskManager,
+ streamsMetrics,
+ internalTopologyBuilder,
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE)
+ ) {
+ @Override
+ void runOnce() {
+ setState(State.PENDING_SHUTDOWN);
+ throw new TaskCorruptedException(corruptedTasksWithChangelogs);
+ }
+ }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+ thread.setState(StreamThread.State.STARTING);
+ thread.runLoop();
+
+ verify(taskManager);
+ }
+
+ @Test
+ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
+ final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+ final Task task1 = mock(Task.class);
+ final Task task2 = mock(Task.class);
+ final TaskId taskId1 = new TaskId(0, 0);
+ final TaskId taskId2 = new TaskId(0, 2);
+
+ final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
+ mkEntry(taskId1, emptySet())
+ );
+
+ expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
+ expect(task1.id()).andReturn(taskId1).anyTimes();
+ expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
+ expect(task2.id()).andReturn(taskId2).anyTimes();
+
+ expect(taskManager.tasks()).andReturn(mkMap(
+ mkEntry(taskId1, task1),
+ mkEntry(taskId2, task2)
+ )).anyTimes();
+ expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated",
+ new RuntimeException("non-corrupted task migrated")));
+
+ taskManager.handleLostAll();
+ expectLastCall();
+
EasyMock.replay(task1, task2, taskManager);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);