You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 23:54:54 UTC

[kafka] 04/06: KAFKA-6711: Add control for restoring in memory buckets

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0bbdb80c4dc87557ff5e52ca8624d15568f8ba41
Author: Cemo <ce...@gmail.com>
AuthorDate: Wed Mar 28 20:14:41 2018 +0300

    KAFKA-6711: Add control for restoring in memory buckets
---
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7c5c874..fac1526 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -246,7 +246,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
-            if (checkpoint != null) {
+            if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) {
                 globalConsumer.seek(topicPartition, checkpoint);
             } else {
                 globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.