You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/16 06:01:58 UTC

[kafka] branch 1.0 updated: KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 4d855ea  KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
4d855ea is described below

commit 4d855eae7a77074cf00c5aa56ed6c747917e4080
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 22:01:50 2018 -0800

    KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/processor/internals/StoreChangelogReader.java    | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34350c1..c03de2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -59,9 +59,14 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setUserRestoreListener(userStateRestoreListener);
-        stateRestorers.put(restorer.partition(), restorer);
-        needsInitializing.put(restorer.partition(), restorer);
+        final StateRestorer existingRestorer = stateRestorers.get(restorer.partition());
+        if (existingRestorer == null) {
+            restorer.setUserRestoreListener(userStateRestoreListener);
+            stateRestorers.put(restorer.partition(), restorer);
+            needsInitializing.put(restorer.partition(), restorer);
+        } else {
+            needsInitializing.put(restorer.partition(), existingRestorer);
+        }
     }
 
     /**
@@ -188,7 +193,6 @@ public class StoreChangelogReader implements ChangelogReader {
                 restorer.setCheckpointOffset(consumer.position(restoringPartition));
 
                 task.reinitializeStateStoresForPartitions(restoringPartition);
-                stateRestorers.get(restoringPartition).restoreStarted();
             } else {
                 log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restoringPartition);