You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 23:54:53 UTC
[kafka] 03/06: KAFKA-6711: Write global stores' offsets as -1
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b5c5fc37e9d374ecc80c70ba48c0c6e3b4066400
Author: Cemo <ce...@gmail.com>
AuthorDate: Wed Mar 28 15:32:39 2018 +0300
KAFKA-6711: Write global stores' offsets as -1
---
.../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 2 +-
.../kafka/streams/processor/internals/GlobalStateManagerImplTest.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 26cfcf7..7c5c874 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -355,7 +355,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
final String topic = topicPartitionOffset.getKey().topic();
if (globalNonPersistentStoresTopics.contains(topic)) {
- log.debug("Skipping global store' topic {}", topic);
+ filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT);
} else {
filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 7769c0a..19af5f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -495,7 +495,7 @@ public class GlobalStateManagerImplTest {
stateManager.register(store3, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
- assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+ assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
}
private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.