You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 23:54:54 UTC
[kafka] 04/06: KAFKA-6711: Add control for restoring in memory
buckets
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0bbdb80c4dc87557ff5e52ca8624d15568f8ba41
Author: Cemo <ce...@gmail.com>
AuthorDate: Wed Mar 28 20:14:41 2018 +0300
KAFKA-6711: Add control for restoring in memory buckets
---
.../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7c5c874..fac1526 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -246,7 +246,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
for (final TopicPartition topicPartition : topicPartitions) {
globalConsumer.assign(Collections.singletonList(topicPartition));
final Long checkpoint = checkpointableOffsets.get(topicPartition);
- if (checkpoint != null) {
+ if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) {
globalConsumer.seek(topicPartition, checkpoint);
} else {
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.