You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/05 20:36:08 UTC

[kafka] branch trunk updated: [KAFKA-6730] Simplify State Store Recovery (#5013)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba0ebca  [KAFKA-6730] Simplify State Store Recovery (#5013)
ba0ebca is described below

commit ba0ebca7a516d4179b6327ddc60b0b49b1265347
Author: ConcurrencyPractitioner <yo...@gmail.com>
AuthorDate: Tue Jun 5 13:35:47 2018 -0700

    [KAFKA-6730] Simplify State Store Recovery (#5013)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../processor/internals/StoreChangelogReader.java  | 60 ++++++--------------
 .../streams/processor/internals/TaskManager.java   |  1 -
 .../internals/StoreChangelogReaderTest.java        | 65 ----------------------
 3 files changed, 16 insertions(+), 110 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76..af5ff47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 
@@ -50,6 +49,7 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+    private Map<TopicPartition, Long> updatedEndOffsets = new HashMap<>();
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener userStateRestoreListener,
@@ -66,12 +66,12 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-    /**
-     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
-     */
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
             initialize();
+            final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
+            remainingPartitions.removeAll(updatedEndOffsets.keySet());
+            updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
         }
 
         if (needsRestoring.isEmpty()) {
@@ -79,11 +79,19 @@ public class StoreChangelogReader implements ChangelogReader {
             return completed();
         }
 
-        final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
-            for (final TopicPartition partition : restoringPartitions) {
-                restorePartition(allRecords, partition, active.restoringTaskFor(partition));
+            final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+            final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
+            while (iterator.hasNext()) {
+                final TopicPartition partition = iterator.next();
+                final StateRestorer restorer = stateRestorers.get(partition);
+                final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition));
+                restorer.setRestoredOffset(pos);
+                if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
+                    restorer.restoreDone();
+                    updatedEndOffsets.remove(partition);
+                    iterator.remove();
+                }
             }
         } catch (final InvalidOffsetException recoverableException) {
             log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
@@ -240,41 +248,6 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.clear();
     }
 
-    /**
-     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
-     */
-    private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
-                                  final TopicPartition topicPartition,
-                                  final Task task) {
-        final StateRestorer restorer = stateRestorers.get(topicPartition);
-        final Long endOffset = endOffsets.get(topicPartition);
-        final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
-        restorer.setRestoredOffset(pos);
-        if (restorer.hasCompleted(pos, endOffset)) {
-            if (pos > endOffset) {
-                throw new TaskMigratedException(task, topicPartition, endOffset, pos);
-            }
-
-            // need to check for changelog topic
-            if (restorer.offsetLimit() == Long.MAX_VALUE) {
-                final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
-                if (!restorer.hasCompleted(pos, updatedEndOffset)) {
-                    throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
-                }
-            }
-
-
-            log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
-                      topicPartition,
-                      restorer.restoredNumRecords(),
-                      restorer.startingOffset(),
-                      restorer.restoredOffset());
-
-            restorer.restoreDone();
-            needsRestoring.remove(topicPartition);
-        }
-    }
-
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
                              final StateRestorer restorer,
                              final Long endOffset) {
@@ -326,7 +299,6 @@ public class StoreChangelogReader implements ChangelogReader {
                 return true;
             }
         }
-
         return false;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6e6e4ca..44db70d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -309,7 +309,6 @@ public class TaskManager {
     /**
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
-     * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4ef..aabe7ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -377,46 +376,6 @@ public class StoreChangelogReaderTest {
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
 
-    @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
-        final int messages = 10;
-        setupConsumer(messages, topicPartition);
-        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) {
-            /* ignore */
-        }
-    }
-
-
-    @Test
-    public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
-        final int messages = 10;
-        setupConsumer(messages, topicPartition);
-        // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
-        // so a subsequent call to endOffsets returns a value exceeding the expected end value
-        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) {
-            // verifies second block threw exception with updated end offset
-            assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
-        }
-    }
-
 
     @Test
     public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
@@ -435,30 +394,6 @@ public class StoreChangelogReaderTest {
 
 
     @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
-        final int totalMessages = 10;
-        assignPartition(totalMessages, topicPartition);
-        // records 0..4
-        addRecords(5, topicPartition, 0);
-        //EOS enabled commit marker at offset 5 so rest of records 6..10
-        addRecords(5, topicPartition, 6);
-        consumer.assign(Collections.<TopicPartition>emptyList());
-
-        // end offsets should start after commit marker of 5 from above
-        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
-        try {
-            changelogReader.restore(active);
-            fail("Should have thrown task migrated exception");
-        } catch (final TaskMigratedException expected) {
-            /* ignore */
-        }
-    }
-
-    @Test
     public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
         final int totalMessages = 10;
         setupConsumer(totalMessages, topicPartition);

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.