You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/14 11:47:39 UTC

[kafka] branch 2.0 updated: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 6fcfb59  KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)
6fcfb59 is described below

commit 6fcfb598c01f964f83b34a767f4fbb730c1bdd5e
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Jun 14 04:44:09 2018 -0700

    KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)
---
 .../internals/GlobalStateManagerImpl.java          | 29 ++++++++++++++++++----
 .../integration/GlobalKTableIntegrationTest.java   | 28 +++++++++++++++++++--
 .../internals/GlobalStateManagerImplTest.java      | 10 ++++++++
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   |  2 +-
 4 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 78c4a36..a4ec23d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -42,6 +42,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -62,6 +63,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
     private final int retries;
     private final long retryBackoffMs;
     private final Duration pollTime;
+    private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -71,6 +73,14 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
                                   final StreamsConfig config) {
         super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent()) {
+                globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
@@ -337,13 +347,22 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
         checkpointableOffsets.putAll(offsets);
-        if (!checkpointableOffsets.isEmpty()) {
-            try {
-                checkpoint.write(checkpointableOffsets);
-            } catch (final IOException e) {
-                log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
+
+        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+        // Skip non persistent store
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
+            final String topic = topicPartitionOffset.getKey().topic();
+            if (!globalNonPersistentStoresTopics.contains(topic)) {
+                filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
             }
         }
+
+        try {
+            checkpoint.write(filteredOffsets);
+        } catch (final IOException e) {
+            log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
+        }
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 900e652..013e2b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -22,13 +22,13 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -53,6 +54,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -220,7 +224,27 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000L, "waiting for final values");
     }
-    
+
+    @Test
+    public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+        builder = new StreamsBuilder();
+        globalTable = builder.globalTable(
+            globalTableTopic,
+            Consumed.with(Serdes.Long(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
+
+        produceInitialGlobalTableValues();
+
+        startStreams();
+        ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+        kafkaStreams.close();
+
+        startStreams();
+        store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+    }
+
     private void createTopics() throws InterruptedException {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 2ca9c21..e37f6a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,16 @@ public class GlobalStateManagerImplTest {
         assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
     }
 
+    @Test
+    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+        stateManager.initialize();
+        initializeConsumer(10, 1, t3);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.close(Collections.emptyMap());
+
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
                                                                                 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index ae46b8d..08945d5 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -95,7 +95,7 @@ public class NoOpReadOnlyStore<K, V>
 
     @Override
     public boolean persistent() {
-        return false;
+        return rocksdbStore;
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.