You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/07/11 18:48:12 UTC

[kafka] branch 2.6 updated: KAFKA-10247: Correctly reset state when task is corrupted (#8994)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 57a9797  KAFKA-10247: Correctly reset state when task is corrupted (#8994)
57a9797 is described below

commit 57a9797f578877f975987e518d190f9cc7bab18a
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Jul 11 13:39:23 2020 -0500

    KAFKA-10247: Correctly reset state when task is corrupted (#8994)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  |  69 +++++++++-----
 .../kafka/streams/processor/internals/Task.java    |   4 +
 .../streams/processor/internals/TaskManager.java   |  38 ++++++++
 .../processor/internals/StreamThreadTest.java      | 105 +++++++++++++++++++++
 .../processor/internals/TaskManagerTest.java       |  44 ++++++++-
 5 files changed, 232 insertions(+), 28 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 838223b..833af28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -95,7 +95,7 @@ public class StreamThread extends Thread {
      *          |      | Assigned (3)| <----+
      *          |      +-----+-------+      |
      *          |            |              |
-     *          |            |              |
+     *          |            |--------------+
      *          |            v              |
      *          |      +-----+-------+      |
      *          |      | Running (4) | ---->+
@@ -137,7 +137,7 @@ public class StreamThread extends Thread {
         STARTING(2, 3, 5),                // 1
         PARTITIONS_REVOKED(2, 3, 5),      // 2
         PARTITIONS_ASSIGNED(2, 3, 4, 5),  // 3
-        RUNNING(2, 3, 5),                 // 4
+        RUNNING(2, 3, 4, 5),              // 4
         PENDING_SHUTDOWN(6),              // 5
         DEAD;                             // 6
 
@@ -359,9 +359,10 @@ public class StreamThread extends Thread {
         consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
         final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
         consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
-        String originalReset = null;
-        if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
-            originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
+        final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        // If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
+        if (!builder.latestResetTopicsPattern().pattern().isEmpty() || !builder.earliestResetTopicsPattern().pattern().isEmpty()) {
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
 
@@ -386,6 +387,8 @@ public class StreamThread extends Thread {
             nextScheduledRebalanceMs
         );
 
+        taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets(partitions, null));
+
         return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
     }
 
@@ -648,7 +651,9 @@ public class StreamThread extends Thread {
 
         // only try to initialize the assigned tasks
         // if the state is still in PARTITION_ASSIGNED after the poll call
-        if (state == State.PARTITIONS_ASSIGNED) {
+        if (state == State.PARTITIONS_ASSIGNED
+            || state == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
             // transit to restore active is idempotent so we can call it multiple times
             changelogReader.enforceRestoreActive();
 
@@ -760,17 +765,17 @@ public class StreamThread extends Thread {
         try {
             records = mainConsumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
-            resetInvalidOffsets(e);
+            resetOffsets(e.partitions(), e);
         }
 
         return records;
     }
 
-    private void resetInvalidOffsets(final InvalidOffsetException e) {
-        final Set<TopicPartition> partitions = e.partitions();
+    private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) {
         final Set<String> loggedTopics = new HashSet<>();
         final Set<TopicPartition> seekToBeginning = new HashSet<>();
         final Set<TopicPartition> seekToEnd = new HashSet<>();
+        final Set<TopicPartition> notReset = new HashSet<>();
 
         for (final TopicPartition partition : partitions) {
             if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
@@ -778,26 +783,44 @@ public class StreamThread extends Thread {
             } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                 addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
             } else {
-                if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
-                    final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
-                        " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
-                        "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))";
-                    throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e);
-                }
-
-                if (originalReset.equals("earliest")) {
+                if ("earliest".equals(originalReset)) {
                     addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
-                } else { // can only be "latest"
+                } else if ("latest".equals(originalReset)) {
                     addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
+                } else {
+                    notReset.add(partition);
                 }
             }
         }
 
-        if (!seekToBeginning.isEmpty()) {
-            mainConsumer.seekToBeginning(seekToBeginning);
-        }
-        if (!seekToEnd.isEmpty()) {
-            mainConsumer.seekToEnd(seekToEnd);
+        if (notReset.isEmpty()) {
+            if (!seekToBeginning.isEmpty()) {
+                mainConsumer.seekToBeginning(seekToBeginning);
+            }
+
+            if (!seekToEnd.isEmpty()) {
+                mainConsumer.seekToEnd(seekToEnd);
+            }
+        } else {
+            final String notResetString =
+                notReset.stream()
+                        .map(TopicPartition::topic)
+                        .distinct()
+                        .collect(Collectors.joining(","));
+
+            final String format = String.format(
+                "No valid committed offset found for input [%s] and no valid reset policy configured." +
+                    " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
+                    "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or " +
+                    "StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))",
+                notResetString
+            );
+
+            if (cause == null) {
+                throw new StreamsException(format);
+            } else {
+                throw new StreamsException(format, cause);
+            }
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 103c231..b6ed143 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -101,6 +101,10 @@ public interface Task {
 
     State state();
 
+    default boolean needsInitializationOrRestoration() {
+        return state() == State.CREATED || state() == State.RESTORING;
+    }
+
     boolean isActive();
 
     boolean isClosed();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c52ebdf..ae1dca5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.common.utils.Utils.intersection;
 import static org.apache.kafka.common.utils.Utils.union;
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
@@ -88,6 +89,7 @@ public class TaskManager {
 
     // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
     private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
+    private java.util.function.Consumer<Set<TopicPartition>> resetter;
 
     TaskManager(final ChangelogReader changelogReader,
                 final UUID processId,
@@ -193,6 +195,34 @@ public class TaskManager {
                 log.error("Error suspending corrupted task {} ", task.id(), swallow);
             }
             task.closeDirty();
+            if (task.isActive()) {
+                // Pause so we won't poll any more records for this task until it has been re-initialized
+                // Note, closeDirty already clears the partitiongroup for the task.
+                final Set<TopicPartition> currentAssignment = mainConsumer().assignment();
+                final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
+                final Set<TopicPartition> assignedToPauseAndReset =
+                    intersection(HashSet::new, currentAssignment, taskInputPartitions);
+                if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
+                    log.warn(
+                        "Expected the current consumer assignment {} to contain the input partitions {}. " +
+                            "Will proceed to recover.",
+                        currentAssignment,
+                        taskInputPartitions
+                    );
+                }
+
+                mainConsumer().pause(assignedToPauseAndReset);
+                final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(assignedToPauseAndReset);
+                for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committed.entrySet()) {
+                    final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
+                    if (offsetAndMetadata != null) {
+                        mainConsumer().seek(committedEntry.getKey(), offsetAndMetadata);
+                        assignedToPauseAndReset.remove(committedEntry.getKey());
+                    }
+                }
+                // throws if anything has no configured reset policy
+                resetter.accept(assignedToPauseAndReset);
+            }
             task.revive();
         }
     }
@@ -1140,4 +1170,12 @@ public class TaskManager {
             throw e; },
             e -> log.debug("Ignoring error in unclean {}", name));
     }
+
+    boolean needsInitializationOrRestoration() {
+        return tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
+    }
+
+    public void setPartitionResetter(final java.util.function.Consumer<Set<TopicPartition>> resetter) {
+        this.resetter = resetter;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c17dd69..a076e6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -94,12 +95,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -1178,6 +1181,108 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldReinitializeRevivedTasksInAnyState() {
+        final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), false);
+
+        final String storeName = "store";
+        final String storeChangelog = "stream-thread-test-store-changelog";
+        final TopicPartition storeChangelogTopicPartition = new TopicPartition(storeChangelog, 1);
+
+        internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
+        final AtomicBoolean shouldThrow = new AtomicBoolean(false);
+        final AtomicBoolean processed = new AtomicBoolean(false);
+        internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object>() {
+            @Override
+            public Processor<Object, Object> get() {
+                return new Processor<Object, Object>() {
+                    private ProcessorContext context;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        this.context = context;
+                    }
+
+                    @Override
+                    public void process(final Object key, final Object value) {
+                        if (shouldThrow.get()) {
+                            throw new TaskCorruptedException(singletonMap(task1, new HashSet<TopicPartition>(singleton(storeChangelogTopicPartition))));
+                        } else {
+                            processed.set(true);
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+
+                    }
+                };
+            }
+        }, "name");
+        internalTopologyBuilder.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(storeName),
+                Serdes.String(),
+                Serdes.String()
+            ),
+            "proc"
+        );
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(mkMap(
+            mkEntry(t1p1, 0L)
+        ));
+
+        final MockConsumer<byte[], byte[]> restoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer();
+        restoreConsumer.updateBeginningOffsets(mkMap(
+            mkEntry(storeChangelogTopicPartition, 0L)
+        ));
+        final MockAdminClient admin = (MockAdminClient) thread.adminClient();
+        admin.updateEndOffsets(singletonMap(storeChangelogTopicPartition, 0L));
+
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+
+        // the first iteration completes the restoration
+        thread.runOnce();
+        assertThat(thread.activeTasks().size(), equalTo(1));
+
+        // the second transits to running and unpause the input
+        thread.runOnce();
+
+        // the third actually polls, processes the record, and throws the corruption exception
+        addRecord(mockConsumer, 0L);
+        shouldThrow.set(true);
+        final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce);
+
+        // Now, we can handle the corruption
+        thread.taskManager().handleCorruption(taskCorruptedException.corruptedTaskWithChangelogs());
+
+        // again, complete the restoration
+        thread.runOnce();
+        // transit to running and unpause
+        thread.runOnce();
+        // process the record
+        addRecord(mockConsumer, 0L);
+        shouldThrow.set(false);
+        assertThat(processed.get(), is(false));
+        thread.runOnce();
+        assertThat(processed.get(), is(true));
+    }
+
+    @Test
     public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
         // only have source but no sink so that we would not get fenced in producer.send
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 36730b5..d7ea5ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -92,6 +92,7 @@ import static org.easymock.EasyMock.resetToStrict;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -566,8 +567,15 @@ public class TaskManagerTest {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
 
+        expect(consumer.assignment()).andReturn(taskId00Partitions);
+        consumer.pause(taskId00Partitions);
+        expectLastCall();
+        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+        expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+        consumer.seek(t1p0, offsetAndMetadata);
+        expectLastCall();
         replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+        taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
@@ -578,6 +586,7 @@ public class TaskManagerTest {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
         verify(stateManager);
+        verify(consumer);
     }
 
     @Test
@@ -598,7 +607,14 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)).anyTimes();
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
-
+        expect(consumer.assignment()).andReturn(taskId00Partitions);
+        consumer.pause(taskId00Partitions);
+        expectLastCall();
+        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+        expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+        consumer.seek(t1p0, offsetAndMetadata);
+        expectLastCall();
+        taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
         replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -611,6 +627,7 @@ public class TaskManagerTest {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
         verify(stateManager);
+        verify(consumer);
     }
 
     @Test
@@ -633,9 +650,15 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer, changeLogReader);
         consumer.commitSync(eq(emptyMap()));
-
+        expect(consumer.assignment()).andReturn(taskId00Partitions);
+        consumer.pause(taskId00Partitions);
+        expectLastCall();
+        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+        expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+        consumer.seek(t1p0, offsetAndMetadata);
+        expectLastCall();
         replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+        taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(), is(true));
 
@@ -645,6 +668,7 @@ public class TaskManagerTest {
         taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
 
         assertTrue(nonCorruptedTask.commitPrepared);
+        verify(consumer);
     }
 
     @Test
@@ -667,8 +691,16 @@ public class TaskManagerTest {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
 
-        replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+        expect(consumer.assignment()).andReturn(taskId00Partitions);
+        consumer.pause(taskId00Partitions);
+        expectLastCall();
+        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+        expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
+        consumer.seek(t1p0, offsetAndMetadata);
+        expectLastCall();
 
+        replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+        taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
 
@@ -676,6 +708,7 @@ public class TaskManagerTest {
 
         verify(activeTaskCreator);
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
+        verify(consumer);
     }
 
     @Test
@@ -714,6 +747,7 @@ public class TaskManagerTest {
         assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
 
         assertThat(corruptedStandby.state(), is(Task.State.CREATED));
+        verify(consumer);
     }
 
     @Test