You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/05 20:36:08 UTC
[kafka] branch trunk updated: [KAFKA-6730] Simplify State Store
Recovery (#5013)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba0ebca [KAFKA-6730] Simplify State Store Recovery (#5013)
ba0ebca is described below
commit ba0ebca7a516d4179b6327ddc60b0b49b1265347
Author: ConcurrencyPractitioner <yo...@gmail.com>
AuthorDate: Tue Jun 5 13:35:47 2018 -0700
[KAFKA-6730] Simplify State Store Recovery (#5013)
Reviewer: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../processor/internals/StoreChangelogReader.java | 60 ++++++--------------
.../streams/processor/internals/TaskManager.java | 1 -
.../internals/StoreChangelogReaderTest.java | 65 ----------------------
3 files changed, 16 insertions(+), 110 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76..af5ff47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
@@ -50,6 +49,7 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+ private Map<TopicPartition, Long> updatedEndOffsets = new HashMap<>();
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener userStateRestoreListener,
@@ -66,12 +66,12 @@ public class StoreChangelogReader implements ChangelogReader {
needsInitializing.put(restorer.partition(), restorer);
}
- /**
- * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
- */
public Collection<TopicPartition> restore(final RestoringTasks active) {
if (!needsInitializing.isEmpty()) {
initialize();
+ final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
+ remainingPartitions.removeAll(updatedEndOffsets.keySet());
+ updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
}
if (needsRestoring.isEmpty()) {
@@ -79,11 +79,19 @@ public class StoreChangelogReader implements ChangelogReader {
return completed();
}
- final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
- final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
- for (final TopicPartition partition : restoringPartitions) {
- restorePartition(allRecords, partition, active.restoringTaskFor(partition));
+ final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+ final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
+ while (iterator.hasNext()) {
+ final TopicPartition partition = iterator.next();
+ final StateRestorer restorer = stateRestorers.get(partition);
+ final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition));
+ restorer.setRestoredOffset(pos);
+ if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
+ restorer.restoreDone();
+ updatedEndOffsets.remove(partition);
+ iterator.remove();
+ }
}
} catch (final InvalidOffsetException recoverableException) {
log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
@@ -240,41 +248,6 @@ public class StoreChangelogReader implements ChangelogReader {
needsInitializing.clear();
}
- /**
- * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
- */
- private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
- final TopicPartition topicPartition,
- final Task task) {
- final StateRestorer restorer = stateRestorers.get(topicPartition);
- final Long endOffset = endOffsets.get(topicPartition);
- final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
- restorer.setRestoredOffset(pos);
- if (restorer.hasCompleted(pos, endOffset)) {
- if (pos > endOffset) {
- throw new TaskMigratedException(task, topicPartition, endOffset, pos);
- }
-
- // need to check for changelog topic
- if (restorer.offsetLimit() == Long.MAX_VALUE) {
- final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
- if (!restorer.hasCompleted(pos, updatedEndOffset)) {
- throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
- }
- }
-
-
- log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
- topicPartition,
- restorer.restoredNumRecords(),
- restorer.startingOffset(),
- restorer.restoredOffset());
-
- restorer.restoreDone();
- needsRestoring.remove(topicPartition);
- }
- }
-
private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
final StateRestorer restorer,
final Long endOffset) {
@@ -326,7 +299,6 @@ public class StoreChangelogReader implements ChangelogReader {
return true;
}
}
-
return false;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6e6e4ca..44db70d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -309,7 +309,6 @@ public class TaskManager {
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
- * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/
boolean updateNewAndRestoringTasks() {
active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4ef..aabe7ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
@@ -377,46 +376,6 @@ public class StoreChangelogReaderTest {
assertThat(callbackTwo.restored.size(), equalTo(3));
}
- @Test
- public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
- final int messages = 10;
- setupConsumer(messages, topicPartition);
- consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
-
- try {
- changelogReader.restore(active);
- fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) {
- /* ignore */
- }
- }
-
-
- @Test
- public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
- final int messages = 10;
- setupConsumer(messages, topicPartition);
- // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
- // so a subsequent call to endOffsets returns a value exceeding the expected end value
- consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
-
- try {
- changelogReader.restore(active);
- fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) {
- // verifies second block threw exception with updated end offset
- assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
- }
- }
-
@Test
public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
@@ -435,30 +394,6 @@ public class StoreChangelogReaderTest {
@Test
- public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
- final int totalMessages = 10;
- assignPartition(totalMessages, topicPartition);
- // records 0..4
- addRecords(5, topicPartition, 0);
- //EOS enabled commit marker at offset 5 so rest of records 6..10
- addRecords(5, topicPartition, 6);
- consumer.assign(Collections.<TopicPartition>emptyList());
-
- // end offsets should start after commit marker of 5 from above
- consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
- try {
- changelogReader.restore(active);
- fail("Should have thrown task migrated exception");
- } catch (final TaskMigratedException expected) {
- /* ignore */
- }
- }
-
- @Test
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
final int totalMessages = 10;
setupConsumer(totalMessages, topicPartition);
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.