You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 23:54:51 UTC
[kafka] 01/06: KAFKA-6711: GlobalStateManagerImpl should not write
offsets of in-memory stores in checkpoint file
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e1956f5e979938909abc65c82b10199b5abdcec7
Author: Cemo <ce...@gmail.com>
AuthorDate: Tue Mar 27 23:17:33 2018 +0300
KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
---
.../internals/GlobalStateManagerImpl.java | 28 ++++++++++++++++++++--
.../internals/GlobalStateManagerImplTest.java | 24 +++++++++++++++++++
.../org/apache/kafka/test/NoOpReadOnlyStore.java | 2 +-
3 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 78c4a36..f9f5878 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -42,6 +42,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -336,10 +337,33 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {
+
+ // Find non persistent store's topics
+ Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+ Set<String> globalNonPersistentStoresTopics = new HashSet<>();
+ for (StateStore store : topology.globalStateStores()) {
+ if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) {
+ globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+ }
+ }
+
checkpointableOffsets.putAll(offsets);
- if (!checkpointableOffsets.isEmpty()) {
+
+ final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+ // Skip non persistent store
+ for (Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
+ String topic = topicPartitionOffset.getKey().topic();
+ if (globalNonPersistentStoresTopics.contains(topic)) {
+ log.debug("Skipping global store' topic {}", topic);
+ } else {
+ filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
+ }
+ }
+
+ if (!filteredOffsets.isEmpty()) {
try {
- checkpoint.write(checkpointableOffsets);
+ checkpoint.write(filteredOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 2ca9c21..3a1cf0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,30 @@ public class GlobalStateManagerImplTest {
assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
}
+ @Test
+ public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+ stateManager.initialize();
+ initializeConsumer(10, 1, t3);
+ stateManager.register(store3, stateRestoreCallback);
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile);
+
+ assertThat(checkpoint.read(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+ }
+
+ @Test
+ public void shouldNotSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+ stateManager.initialize();
+ initializeConsumer(10, 1, t1);
+ stateManager.register(store1, stateRestoreCallback);
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile);
+
+ assertThat(checkpoint.read(), equalTo(Collections.singletonMap(t1, 11L)));
+ }
+
private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index ae46b8d..08945d5 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -95,7 +95,7 @@ public class NoOpReadOnlyStore<K, V>
@Override
public boolean persistent() {
- return false;
+ return rocksdbStore;
}
@Override
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.