You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/16 06:01:58 UTC
[kafka] branch 1.0 updated: KAFKA-7192: Wipe out state store if EOS
is turned on and checkpoint file does not exist
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 4d855ea KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
4d855ea is described below
commit 4d855eae7a77074cf00c5aa56ed6c747917e4080
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 22:01:50 2018 -0800
KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/StoreChangelogReader.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34350c1..c03de2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -59,9 +59,14 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public void register(final StateRestorer restorer) {
- restorer.setUserRestoreListener(userStateRestoreListener);
- stateRestorers.put(restorer.partition(), restorer);
- needsInitializing.put(restorer.partition(), restorer);
+ final StateRestorer existingRestorer = stateRestorers.get(restorer.partition());
+ if (existingRestorer == null) {
+ restorer.setUserRestoreListener(userStateRestoreListener);
+ stateRestorers.put(restorer.partition(), restorer);
+ needsInitializing.put(restorer.partition(), restorer);
+ } else {
+ needsInitializing.put(restorer.partition(), existingRestorer);
+ }
}
/**
@@ -188,7 +193,6 @@ public class StoreChangelogReader implements ChangelogReader {
restorer.setCheckpointOffset(consumer.position(restoringPartition));
task.reinitializeStateStoresForPartitions(restoringPartition);
- stateRestorers.get(restoringPartition).restoreStarted();
} else {
log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restoringPartition);