You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/09/14 19:05:32 UTC
[kafka] branch 0.11.0 updated: KAFKA-7192: Wipe out state store if
EOS is turned on and checkpoint file does not exist (#5641)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 4c9d49b KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641)
4c9d49b is described below
commit 4c9d49bd3bb8381e95bba4e3223c0d6a3c3c8e22
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Sep 14 12:05:24 2018 -0700
KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641)
Reviews: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../internals/AbstractProcessorContext.java | 6 ++
.../streams/processor/internals/AbstractTask.java | 4 ++
.../streams/processor/internals/AssignedTasks.java | 2 +-
.../processor/internals/ChangelogReader.java | 2 +-
.../internals/InternalProcessorContext.java | 5 ++
.../processor/internals/ProcessorStateManager.java | 63 +++++++++++++++++++-
.../streams/processor/internals/StateRestorer.java | 19 ++++--
.../processor/internals/StoreChangelogReader.java | 64 ++++++++++++++------
.../streams/processor/internals/StreamThread.java | 2 +-
.../streams/integration/EosIntegrationTest.java | 4 +-
.../processor/internals/StateRestorerTest.java | 4 +-
.../internals/StoreChangelogReaderTest.java | 68 +++++++++++-----------
.../org/apache/kafka/test/MockChangelogReader.java | 3 +-
.../apache/kafka/test/MockProcessorContext.java | 3 +
14 files changed, 184 insertions(+), 65 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 04af9f2..c094b83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -193,4 +193,10 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
public void initialized() {
initialized = true;
}
+
+ @Override
+ public void uninitialize() {
+ initialized = false;
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f6ac7c..f8f6416 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,10 @@ public abstract class AbstractTask {
}
}
+ void reinitializeStateStoresForPartitions(final TopicPartition partitions) {
+ stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext);
+ }
+
/**
* @throws ProcessorStateException if there is an error while closing the state manager
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index d59ec2b..ad4868f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -298,7 +298,7 @@ class AssignedTasks<T extends AbstractTask> {
return suspended.values();
}
- Collection<T> restoringTasks() {
+ public Collection<T> restoringTasks() {
return Collections.unmodifiableCollection(restoring.values());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c..e82ee2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@ public interface ChangelogReader {
* Restore all registered state stores by reading from their changelogs.
* @return all topic partitions that have been restored
*/
- Collection<TopicPartition> restore();
+ Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks);
/**
* @return the restored offsets for all persistent stores.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac..b5719b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@ public interface InternalProcessorContext extends ProcessorContext {
* Mark this contex as being initialized
*/
void initialized();
+
+ /**
+ * Mark this context as being uninitialized
+ */
+ void uninitialize();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 34a87ce..93e7ffc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +34,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ProcessorStateManager implements StateManager {
@@ -50,6 +53,7 @@ public class ProcessorStateManager implements StateManager {
private final String logPrefix;
private final boolean isStandby;
private final ChangelogReader changelogReader;
+ private final boolean eosEnabled;
private final Map<String, StateStore> stores;
private final Map<String, StateStore> globalStores;
private final Map<TopicPartition, Long> offsetLimits;
@@ -106,6 +110,7 @@ public class ProcessorStateManager implements StateManager {
checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointedOffsets = new HashMap<>(checkpoint.read());
+ this.eosEnabled = eosEnabled;
if (eosEnabled) {
// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
@@ -169,7 +174,8 @@ public class ProcessorStateManager implements StateManager {
stateRestoreCallback,
checkpointedOffsets.get(storePartition),
offsetLimit(storePartition),
- store.persistent()
+ store.persistent(),
+ store.name()
);
changelogReader.register(restorer);
@@ -178,6 +184,61 @@ public class ProcessorStateManager implements StateManager {
stores.put(store.name(), store);
}
+ void reinitializeStateStoresForPartitions(final TopicPartition topicPartition,
+ final InternalProcessorContext processorContext) {
+ final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic);
+ final Set<String> storeToBeReinitialized = new HashSet<>();
+ final Map<String, StateStore> storesCopy = new HashMap<>(stores);
+
+ checkpointedOffsets.remove(topicPartition);
+ storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+
+ if (!eosEnabled) {
+ try {
+ checkpoint.write(checkpointedOffsets);
+ } catch (final IOException fatalException) {
+ log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stores, fatalException);
+ throw new StreamsException("Failed to reinitialize stores.", fatalException);
+ }
+ }
+
+ for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
+ final StateStore stateStore = entry.getValue();
+ final String storeName = stateStore.name();
+ if (storeToBeReinitialized.contains(storeName)) {
+ try {
+ stateStore.close();
+ } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ }
+ processorContext.uninitialize();
+ stores.remove(entry.getKey());
+
+ try {
+ Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator + storeName));
+ } catch (final IOException fatalException) {
+ log.error("Failed to reinitialize store {}.", storeName, fatalException);
+ throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
+ }
+
+ try {
+ Utils.delete(new File(baseDir + File.separator + storeName));
+ } catch (final IOException fatalException) {
+ log.error("Failed to reinitialize store {}.", storeName, fatalException);
+ throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
+ }
+
+ stateStore.init(processorContext, stateStore);
+ }
+ }
+ }
+
+ private Map<String, String> inverseOneToOneMap(final Map<String, String> origin) {
+ final Map<String, String> reversedMap = new HashMap<>();
+ for (final Map.Entry<String, String> entry : origin.entrySet()) {
+ reversedMap.put(entry.getValue(), entry.getKey());
+ }
+ return reversedMap;
+ }
+
@Override
public Map<TopicPartition, Long> checkpointed() {
final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 79bfd1d..3c5efe8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -22,12 +22,13 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
public class StateRestorer {
static final int NO_CHECKPOINT = -1;
- private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
+ private final String storeName;
private final TopicPartition partition;
private final StateRestoreCallback stateRestoreCallback;
+ private long checkpointOffset;
private long restoredOffset;
private long startingOffset;
@@ -35,12 +36,14 @@ public class StateRestorer {
final StateRestoreCallback stateRestoreCallback,
final Long checkpoint,
final long offsetLimit,
- final boolean persistent) {
+ final boolean persistent,
+ final String storeName) {
this.partition = partition;
this.stateRestoreCallback = stateRestoreCallback;
- this.checkpoint = checkpoint;
+ this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
+ this.storeName = storeName;
}
public TopicPartition partition() {
@@ -48,7 +51,15 @@ public class StateRestorer {
}
long checkpoint() {
- return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+ return checkpointOffset;
+ }
+
+ void setCheckpointOffset(final long checkpointOffset) {
+ this.checkpointOffset = checkpointOffset;
+ }
+
+ public String storeName() {
+ return storeName;
}
void restore(final byte[] key, final byte[] value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34dcb75..305bf10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -60,13 +60,16 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public void register(final StateRestorer restorer) {
- stateRestorers.put(restorer.partition(), restorer);
+ if (!stateRestorers.containsKey(restorer.partition())) {
+ stateRestorers.put(restorer.partition(), restorer);
+ log.trace("Added restorer for changelog {}", restorer.partition());
+ }
needsInitializing.put(restorer.partition(), restorer);
}
- public Collection<TopicPartition> restore() {
+ public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks) {
if (!needsInitializing.isEmpty()) {
- initialize();
+ initialize(restoringTasks);
}
if (needsRestoring.isEmpty()) {
@@ -87,7 +90,7 @@ public class StoreChangelogReader implements ChangelogReader {
return completed();
}
- private void initialize() {
+ private void initialize(final Collection<StreamTask> restoringTasks) {
if (!consumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")");
}
@@ -139,11 +142,12 @@ public class StoreChangelogReader implements ChangelogReader {
// set up restorer for those initializable
if (!initializable.isEmpty()) {
- startRestoration(initializable);
+ startRestoration(initializable, restoringTasks);
}
}
- private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
+ private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+ final Collection<StreamTask> restoringTasks) {
log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet());
final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
@@ -152,24 +156,48 @@ public class StoreChangelogReader implements ChangelogReader {
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
for (final StateRestorer restorer : initialized.values()) {
+ final TopicPartition restoringPartition = restorer.partition();
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
- consumer.seek(restorer.partition(), restorer.checkpoint());
- logRestoreOffsets(restorer.partition(),
- restorer.checkpoint(),
- endOffsets.get(restorer.partition()));
- restorer.setStartingOffset(consumer.position(restorer.partition()));
+ consumer.seek(restoringPartition, restorer.checkpoint());
+ logRestoreOffsets(
+ restoringPartition,
+ restorer.checkpoint(),
+ endOffsets.get(restoringPartition));
+ restorer.setStartingOffset(consumer.position(restoringPartition));
} else {
- consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+ consumer.seekToBeginning(Collections.singletonList(restoringPartition));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final long position = consumer.position(restorer.partition());
- logRestoreOffsets(restorer.partition(),
- position,
- endOffsets.get(restorer.partition()));
- restorer.setStartingOffset(position);
+ final TopicPartition restoringPartition = restorer.partition();
+
+ for (final StreamTask task : restoringTasks) {
+ if (task.changelogPartitions().contains(restoringPartition) || task.partitions().contains(restoringPartition)) {
+ if (task.eosEnabled) {
+ log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
+ "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
+ needsInitializing.remove(restoringPartition);
+ initialized.put(restoringPartition, restorer);
+ restorer.setCheckpointOffset(consumer.position(restoringPartition));
+
+ task.reinitializeStateStoresForPartitions(restoringPartition);
+ } else {
+ log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition());
+
+ final long position = consumer.position(restoringPartition);
+ logRestoreOffsets(
+ restoringPartition,
+ position,
+ endOffsets.get(restoringPartition));
+ restorer.setStartingOffset(position);
+ }
+ }
+ }
+
+
}
needsRestoring.putAll(initialized);
@@ -220,7 +248,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
- final TopicPartition topicPartition) {
+ final TopicPartition topicPartition) {
final StateRestorer restorer = stateRestorers.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 210b070..6b7f4f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -532,7 +532,7 @@ public class StreamThread extends Thread {
active.initializeNewTasks();
standby.initializeNewTasks();
- final Collection<TopicPartition> restored = storeChangelogReader.restore();
+ final Collection<TopicPartition> restored = storeChangelogReader.restore(active.restoringTasks());
final Set<TopicPartition> resumed = active.updateRestored(restored);
if (!resumed.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36a..7f70950 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -392,7 +392,7 @@ public class EosIntegrationTest {
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
//
- // the failure gets inject after 20 committed and 30 uncommitted records got received
+ // the failure gets inject after 20 committed and 10 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state stores should contain the correct sums
// per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public class EosIntegrationTest {
streams.start();
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
- final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 6968f33..3a8ebda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -28,7 +28,7 @@ public class StateRestorerTest {
private static final long OFFSET_LIMIT = 50;
private final MockRestoreCallback callback = new MockRestoreCallback();
- private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true);
+ private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true, "store");
@Test
public void shouldCallRestoreOnRestoreCallback() throws Exception {
@@ -53,7 +53,7 @@ public class StateRestorerTest {
@Test
public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception {
- final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true);
+ final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true, "store");
assertTrue(restorer.hasCompleted(0, 10));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a43f083..24820f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,8 +59,8 @@ public class StoreChangelogReaderTest {
};
final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertTrue(functionCalled.get());
}
@@ -68,7 +68,7 @@ public class StoreChangelogReaderTest {
public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
consumer.subscribe(Collections.singleton("sometopic"));
try {
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException e) {
// ok
@@ -79,9 +79,9 @@ public class StoreChangelogReaderTest {
public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(messages));
}
@@ -89,9 +89,9 @@ public class StoreChangelogReaderTest {
public void shouldRestoreMessagesFromCheckpoint() throws Exception {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(5));
}
@@ -99,18 +99,18 @@ public class StoreChangelogReaderTest {
public void shouldClearAssignmentAtEndOfRestore() throws Exception {
final int messages = 1;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
}
@Test
public void shouldRestoreToLimitWhenSupplied() throws Exception {
setupConsumer(10, topicPartition);
- final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true, "store");
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(3));
assertThat(restorer.restoredOffset(), equalTo(3L));
}
@@ -125,11 +125,11 @@ public class StoreChangelogReaderTest {
setupConsumer(5, one);
setupConsumer(3, two);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
- changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true));
- changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
+ changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true, "store"));
+ changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(10));
assertThat(callbackOne.restored.size(), equalTo(5));
@@ -138,11 +138,11 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
- final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store");
setupConsumer(0, topicPartition);
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(0));
assertThat(restorer.restoredOffset(), equalTo(0L));
}
@@ -151,11 +151,11 @@ public class StoreChangelogReaderTest {
public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception {
final Long endOffset = 10L;
setupConsumer(endOffset, topicPartition);
- final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true, "store");
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(0));
assertThat(restorer.restoredOffset(), equalTo(endOffset));
}
@@ -163,8 +163,8 @@ public class StoreChangelogReaderTest {
@Test
public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
}
@@ -172,8 +172,8 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
}
@@ -186,8 +186,8 @@ public class StoreChangelogReaderTest {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, (byte[]) null, bytes));
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 2, bytes, bytes));
consumer.assign(Collections.singletonList(topicPartition));
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes))));
}
@@ -200,15 +200,15 @@ public class StoreChangelogReaderTest {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes));
}
consumer.assign(Collections.singletonList(topicPartition));
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store"));
- final Collection<TopicPartition> completedFirstTime = changelogReader.restore();
+ final Collection<TopicPartition> completedFirstTime = changelogReader.restore(Collections.<StreamTask>emptySet());
assertTrue(completedFirstTime.isEmpty());
for (int i = 5; i < 10; i++) {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes));
}
final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
- assertThat(changelogReader.restore(), equalTo(expected));
+ assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected));
}
private void setupConsumer(final long messages, final TopicPartition topicPartition) {
@@ -237,8 +237,8 @@ public class StoreChangelogReaderTest {
public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
setupConsumer(0, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
- final Collection<TopicPartition> restored = changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"));
+ final Collection<TopicPartition> restored = changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(restored, equalTo(expected));
}
@@ -248,9 +248,9 @@ public class StoreChangelogReaderTest {
setupConsumer(1, topicPartition);
consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
- changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store"));
- assertTrue(changelogReader.restore().isEmpty());
+ assertTrue(changelogReader.restore(Collections.<StreamTask>emptySet()).isEmpty());
addRecords(9, topicPartition, 1);
@@ -259,12 +259,12 @@ public class StoreChangelogReaderTest {
consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
- changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false, "store"));
final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
consumer.assign(expected);
- assertThat(changelogReader.restore(), equalTo(expected));
+ assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected));
assertThat(callback.restored.size(), equalTo(10));
assertThat(callbackTwo.restored.size(), equalTo(3));
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 54fd858..93ce801 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.StateRestorer;
+import org.apache.kafka.streams.processor.internals.StreamTask;
import java.util.Collection;
import java.util.Collections;
@@ -35,7 +36,7 @@ public class MockChangelogReader implements ChangelogReader {
}
@Override
- public Collection<TopicPartition> restore() {
+ public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks) {
return registered;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb56fa1..fed8752 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -143,6 +143,9 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol
public void initialized() {}
@Override
+ public void uninitialize() {}
+
+ @Override
public File stateDir() {
if (stateDir == null) {
throw new UnsupportedOperationException("State directory not specified");