You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/07/11 18:48:12 UTC
[kafka] branch 2.6 updated: KAFKA-10247: Correctly reset state when
task is corrupted (#8994)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 57a9797 KAFKA-10247: Correctly reset state when task is corrupted (#8994)
57a9797 is described below
commit 57a9797f578877f975987e518d190f9cc7bab18a
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Jul 11 13:39:23 2020 -0500
KAFKA-10247: Correctly reset state when task is corrupted (#8994)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 69 +++++++++-----
.../kafka/streams/processor/internals/Task.java | 4 +
.../streams/processor/internals/TaskManager.java | 38 ++++++++
.../processor/internals/StreamThreadTest.java | 105 +++++++++++++++++++++
.../processor/internals/TaskManagerTest.java | 44 ++++++++-
5 files changed, 232 insertions(+), 28 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 838223b..833af28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -95,7 +95,7 @@ public class StreamThread extends Thread {
* | | Assigned (3)| <----+
* | +-----+-------+ |
* | | |
- * | | |
+ * | |--------------+
* | v |
* | +-----+-------+ |
* | | Running (4) | ---->+
@@ -137,7 +137,7 @@ public class StreamThread extends Thread {
STARTING(2, 3, 5), // 1
PARTITIONS_REVOKED(2, 3, 5), // 2
PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
- RUNNING(2, 3, 5), // 4
+ RUNNING(2, 3, 4, 5), // 4
PENDING_SHUTDOWN(6), // 5
DEAD; // 6
@@ -359,9 +359,10 @@ public class StreamThread extends Thread {
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
- String originalReset = null;
- if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
- originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
+ final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ // If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
+ if (!builder.latestResetTopicsPattern().pattern().isEmpty() || !builder.earliestResetTopicsPattern().pattern().isEmpty()) {
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
@@ -386,6 +387,8 @@ public class StreamThread extends Thread {
nextScheduledRebalanceMs
);
+ taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets(partitions, null));
+
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
}
@@ -648,7 +651,9 @@ public class StreamThread extends Thread {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
- if (state == State.PARTITIONS_ASSIGNED) {
+ if (state == State.PARTITIONS_ASSIGNED
+ || state == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();
@@ -760,17 +765,17 @@ public class StreamThread extends Thread {
try {
records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
- resetInvalidOffsets(e);
+ resetOffsets(e.partitions(), e);
}
return records;
}
- private void resetInvalidOffsets(final InvalidOffsetException e) {
- final Set<TopicPartition> partitions = e.partitions();
+ private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) {
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
+ final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
@@ -778,26 +783,44 @@ public class StreamThread extends Thread {
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
} else {
- if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
- final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
- " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
- "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))";
- throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e);
- }
-
- if (originalReset.equals("earliest")) {
+ if ("earliest".equals(originalReset)) {
addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
- } else { // can only be "latest"
+ } else if ("latest".equals(originalReset)) {
addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
+ } else {
+ notReset.add(partition);
}
}
}
- if (!seekToBeginning.isEmpty()) {
- mainConsumer.seekToBeginning(seekToBeginning);
- }
- if (!seekToEnd.isEmpty()) {
- mainConsumer.seekToEnd(seekToEnd);
+ if (notReset.isEmpty()) {
+ if (!seekToBeginning.isEmpty()) {
+ mainConsumer.seekToBeginning(seekToBeginning);
+ }
+
+ if (!seekToEnd.isEmpty()) {
+ mainConsumer.seekToEnd(seekToEnd);
+ }
+ } else {
+ final String notResetString =
+ notReset.stream()
+ .map(TopicPartition::topic)
+ .distinct()
+ .collect(Collectors.joining(","));
+
+ final String format = String.format(
+ "No valid committed offset found for input [%s] and no valid reset policy configured." +
+ " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
+ "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or " +
+ "StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))",
+ notResetString
+ );
+
+ if (cause == null) {
+ throw new StreamsException(format);
+ } else {
+ throw new StreamsException(format, cause);
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 103c231..b6ed143 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -101,6 +101,10 @@ public interface Task {
State state();
+ default boolean needsInitializationOrRestoration() {
+ return state() == State.CREATED || state() == State.RESTORING;
+ }
+
boolean isActive();
boolean isClosed();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c52ebdf..ae1dca5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
@@ -88,6 +89,7 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
+ private java.util.function.Consumer<Set<TopicPartition>> resetter;
TaskManager(final ChangelogReader changelogReader,
final UUID processId,
@@ -193,6 +195,34 @@ public class TaskManager {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
task.closeDirty();
+ if (task.isActive()) {
+ // Pause so we won't poll any more records for this task until it has been re-initialized
+ // Note, closeDirty already clears the partitiongroup for the task.
+ final Set<TopicPartition> currentAssignment = mainConsumer().assignment();
+ final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
+ final Set<TopicPartition> assignedToPauseAndReset =
+ intersection(HashSet::new, currentAssignment, taskInputPartitions);
+ if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
+ log.warn(
+ "Expected the current consumer assignment {} to contain the input partitions {}. " +
+ "Will proceed to recover.",
+ currentAssignment,
+ taskInputPartitions
+ );
+ }
+
+ mainConsumer().pause(assignedToPauseAndReset);
+ final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(assignedToPauseAndReset);
+ for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committed.entrySet()) {
+ final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
+ if (offsetAndMetadata != null) {
+ mainConsumer().seek(committedEntry.getKey(), offsetAndMetadata);
+ assignedToPauseAndReset.remove(committedEntry.getKey());
+ }
+ }
+ // throws if anything has no configured reset policy
+ resetter.accept(assignedToPauseAndReset);
+ }
task.revive();
}
}
@@ -1140,4 +1170,12 @@ public class TaskManager {
throw e; },
e -> log.debug("Ignoring error in unclean {}", name));
}
+
+ boolean needsInitializationOrRestoration() {
+ return tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
+ }
+
+ public void setPartitionResetter(final java.util.function.Consumer<Set<TopicPartition>> resetter) {
+ this.resetter = resetter;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c17dd69..a076e6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -94,12 +95,14 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -1178,6 +1181,108 @@ public class StreamThreadTest {
}
@Test
+ public void shouldReinitializeRevivedTasksInAnyState() {
+ final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), false);
+
+ final String storeName = "store";
+ final String storeChangelog = "stream-thread-test-store-changelog";
+ final TopicPartition storeChangelogTopicPartition = new TopicPartition(storeChangelog, 1);
+
+ internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
+ final AtomicBoolean shouldThrow = new AtomicBoolean(false);
+ final AtomicBoolean processed = new AtomicBoolean(false);
+ internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object>() {
+ @Override
+ public Processor<Object, Object> get() {
+ return new Processor<Object, Object>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Object key, final Object value) {
+ if (shouldThrow.get()) {
+ throw new TaskCorruptedException(singletonMap(task1, new HashSet<TopicPartition>(singleton(storeChangelogTopicPartition))));
+ } else {
+ processed.set(true);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+ }, "name");
+ internalTopologyBuilder.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(storeName),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ "proc"
+ );
+
+ thread.setState(StreamThread.State.STARTING);
+ thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+ // assign single partition
+ assignedPartitions.add(t1p1);
+ activeTasks.put(task1, Collections.singleton(t1p1));
+
+ thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
+ mockConsumer.assign(assignedPartitions);
+ mockConsumer.updateBeginningOffsets(mkMap(
+ mkEntry(t1p1, 0L)
+ ));
+
+ final MockConsumer<byte[], byte[]> restoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer();
+ restoreConsumer.updateBeginningOffsets(mkMap(
+ mkEntry(storeChangelogTopicPartition, 0L)
+ ));
+ final MockAdminClient admin = (MockAdminClient) thread.adminClient();
+ admin.updateEndOffsets(singletonMap(storeChangelogTopicPartition, 0L));
+
+ thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+
+ // the first iteration completes the restoration
+ thread.runOnce();
+ assertThat(thread.activeTasks().size(), equalTo(1));
+
+ // the second transits to running and unpause the input
+ thread.runOnce();
+
+ // the third actually polls, processes the record, and throws the corruption exception
+ addRecord(mockConsumer, 0L);
+ shouldThrow.set(true);
+ final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce);
+
+ // Now, we can handle the corruption
+ thread.taskManager().handleCorruption(taskCorruptedException.corruptedTaskWithChangelogs());
+
+ // again, complete the restoration
+ thread.runOnce();
+ // transit to running and unpause
+ thread.runOnce();
+ // process the record
+ addRecord(mockConsumer, 0L);
+ shouldThrow.set(false);
+ assertThat(processed.get(), is(false));
+ thread.runOnce();
+ assertThat(processed.get(), is(true));
+ }
+
+ @Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
// only have source but no sink so that we would not get fenced in producer.send
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 36730b5..d7ea5ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -92,6 +92,7 @@ import static org.easymock.EasyMock.resetToStrict;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -566,8 +567,15 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
+ expect(consumer.assignment()).andReturn(taskId00Partitions);
+ consumer.pause(taskId00Partitions);
+ expectLastCall();
+ final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+ expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+ consumer.seek(t1p0, offsetAndMetadata);
+ expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+ taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@@ -578,6 +586,7 @@ public class TaskManagerTest {
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
+ verify(consumer);
}
@Test
@@ -598,7 +607,14 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)).anyTimes();
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
-
+ expect(consumer.assignment()).andReturn(taskId00Partitions);
+ consumer.pause(taskId00Partitions);
+ expectLastCall();
+ final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+ expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+ consumer.seek(t1p0, offsetAndMetadata);
+ expectLastCall();
+ taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -611,6 +627,7 @@ public class TaskManagerTest {
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
+ verify(consumer);
}
@Test
@@ -633,9 +650,15 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
consumer.commitSync(eq(emptyMap()));
-
+ expect(consumer.assignment()).andReturn(taskId00Partitions);
+ consumer.pause(taskId00Partitions);
+ expectLastCall();
+ final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+ expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+ consumer.seek(t1p0, offsetAndMetadata);
+ expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+ taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(), is(true));
@@ -645,6 +668,7 @@ public class TaskManagerTest {
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
assertTrue(nonCorruptedTask.commitPrepared);
+ verify(consumer);
}
@Test
@@ -667,8 +691,16 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
- replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+ expect(consumer.assignment()).andReturn(taskId00Partitions);
+ consumer.pause(taskId00Partitions);
+ expectLastCall();
+ final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+ expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+ consumer.seek(t1p0, offsetAndMetadata);
+ expectLastCall();
+ replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+ taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
@@ -676,6 +708,7 @@ public class TaskManagerTest {
verify(activeTaskCreator);
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
+ verify(consumer);
}
@Test
@@ -714,6 +747,7 @@ public class TaskManagerTest {
assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
+ verify(consumer);
}
@Test