You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 23:54:51 UTC

[kafka] 01/06: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e1956f5e979938909abc65c82b10199b5abdcec7
Author: Cemo <ce...@gmail.com>
AuthorDate: Tue Mar 27 23:17:33 2018 +0300

    KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
---
 .../internals/GlobalStateManagerImpl.java          | 28 ++++++++++++++++++++--
 .../internals/GlobalStateManagerImplTest.java      | 24 +++++++++++++++++++
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   |  2 +-
 3 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 78c4a36..f9f5878 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -42,6 +42,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -336,10 +337,33 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
+
+        // Find non persistent store's topics
+        Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+        Set<String> globalNonPersistentStoresTopics = new HashSet<>();
+        for (StateStore store : topology.globalStateStores()) {
+            if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) {
+                globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         checkpointableOffsets.putAll(offsets);
-        if (!checkpointableOffsets.isEmpty()) {
+
+        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+        // Skip non persistent store
+        for (Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
+            String topic = topicPartitionOffset.getKey().topic();
+            if (globalNonPersistentStoresTopics.contains(topic)) {
+                log.debug("Skipping global store' topic {}", topic);
+            } else {
+                filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
+            }
+        }
+
+        if (!filteredOffsets.isEmpty()) {
             try {
-                checkpoint.write(checkpointableOffsets);
+                checkpoint.write(filteredOffsets);
             } catch (final IOException e) {
                 log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 2ca9c21..3a1cf0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,30 @@ public class GlobalStateManagerImplTest {
         assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
     }
 
+    @Test
+    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+        stateManager.initialize();
+        initializeConsumer(10, 1, t3);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile);
+
+        assertThat(checkpoint.read(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+    }
+
+    @Test
+    public void shouldNotSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+        stateManager.initialize();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile);
+
+        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(t1, 11L)));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
                                                                                 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index ae46b8d..08945d5 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -95,7 +95,7 @@ public class NoOpReadOnlyStore<K, V>
 
     @Override
     public boolean persistent() {
-        return false;
+        return rocksdbStore;
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.