You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/14 21:21:52 UTC
[kafka] branch 0.11.0 updated: KAFKA-6711: GlobalStateManagerImpl
should not write offsets of in-memory stores in checkpoint file (#5219)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new a8e48b3 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)
a8e48b3 is described below
commit a8e48b3f95e032ecce4036e00165c34f331b5f31
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Jun 14 04:44:09 2018 -0700
KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)
---
.../internals/GlobalStateManagerImpl.java | 24 +++++++++++++++++--
.../integration/GlobalKTableIntegrationTest.java | 27 +++++++++++++++++++++-
.../internals/GlobalStateManagerImplTest.java | 19 +++++++++++++--
.../org/apache/kafka/test/NoOpReadOnlyStore.java | 11 +++++++--
4 files changed, 74 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 1e2e5ff..73999c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -59,6 +59,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final OffsetCheckpoint checkpoint;
private final Set<String> globalStoreNames = new HashSet<>();
private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
+ private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
public GlobalStateManagerImpl(final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
@@ -68,6 +69,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
this.stateDirectory = stateDirectory;
this.baseDir = stateDirectory.globalStateDir();
this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+
+ // Find non persistent store's topics
+ final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+ for (final StateStore store : topology.globalStateStores()) {
+ if (!store.persistent()) {
+ globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+ }
+ }
}
@Override
@@ -230,9 +239,20 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {
checkpointableOffsets.putAll(offsets);
- if (!checkpointableOffsets.isEmpty()) {
+
+ final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+ // Skip non persistent store
+ for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
+ final String topic = topicPartitionOffset.getKey().topic();
+ if (!globalNonPersistentStoresTopics.contains(topic)) {
+ filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
+ }
+ }
+
+ if (!filteredOffsets.isEmpty()) {
try {
- checkpoint.write(checkpointableOffsets);
+ checkpoint.write(filteredOffsets);
} catch (IOException e) {
log.warn("failed to write offsets checkpoint for global stores", e);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 9a849f5..b3df990 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -51,6 +52,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
@@ -217,7 +221,28 @@ public class GlobalKTableIntegrationTest {
}
}, 30000L, "waiting for final values");
}
-
+
+ @Test
+ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+ builder = new KStreamBuilder();
+ globalTable = builder.globalTable(
+ Serdes.Long(),
+ Serdes.String(),
+ globalOne,
+ new InMemoryKeyValueStoreSupplier<>(globalStore, Serdes.Long(), Serdes.String(), false, null));
+
+ produceInitialGlobalTableValues();
+
+ startStreams();
+ ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+ assertThat(store.approximateNumEntries(), equalTo(4L));
+ kafkaStreams.close();
+
+ startStreams();
+ store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+ assertThat(store.approximateNumEntries(), equalTo(4L));
+ }
+
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 98ef8f6a..b4b2cc5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -63,12 +63,14 @@ public class GlobalStateManagerImplTest {
private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
private final TopicPartition t1 = new TopicPartition("t1", 1);
private final TopicPartition t2 = new TopicPartition("t2", 1);
+ private final TopicPartition t3 = new TopicPartition("t3", 1);
private GlobalStateManagerImpl stateManager;
private NoOpProcessorContext context;
private StateDirectory stateDirectory;
private String stateDirPath;
private NoOpReadOnlyStore<Object, Object> store1;
private NoOpReadOnlyStore store2;
+ private NoOpReadOnlyStore store3;
private MockConsumer<byte[], byte[]> consumer;
private File checkpointFile;
private ProcessorTopology topology;
@@ -78,18 +80,21 @@ public class GlobalStateManagerImplTest {
final Map<String, String> storeToTopic = new HashMap<>();
storeToTopic.put("t1-store", "t1");
storeToTopic.put("t2-store", "t2");
+ storeToTopic.put("t3-store", "t3");
final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
store1 = new NoOpReadOnlyStore<>("t1-store");
storeToProcessorNode.put(store1, new MockProcessorNode(-1));
store2 = new NoOpReadOnlyStore("t2-store");
storeToProcessorNode.put(store2, new MockProcessorNode(-1));
+ store3 = new NoOpReadOnlyStore("t3-store", false);
+ storeToProcessorNode.put(store2, new MockProcessorNode(-1));
topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
Collections.<String, SourceNode>emptyMap(),
Collections.<String, SinkNode>emptyMap(),
Collections.<StateStore>emptyList(),
storeToTopic,
- Arrays.<StateStore>asList(store1, store2));
+ Arrays.<StateStore>asList(store1, store2, store3));
context = new NoOpProcessorContext();
stateDirPath = TestUtils.tempDirectory().getPath();
@@ -153,7 +158,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldReturnInitializedStoreNames() throws Exception {
final Set<String> storeNames = stateManager.initialize(context);
- assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
+ assertEquals(Utils.mkSet(store1.name(), store2.name(), store3.name()), storeNames);
}
@Test
@@ -439,6 +444,16 @@ public class GlobalStateManagerImplTest {
assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
}
+ @Test
+ public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+ stateManager.initialize(context);
+ initializeConsumer(10, 1, t3);
+ stateManager.register(store3, false, stateRestoreCallback);
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+ assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+ }
+
private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 0ada2e4..892a4dc 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -25,17 +25,24 @@ public class NoOpReadOnlyStore<K, V>
implements ReadOnlyKeyValueStore<K, V>, StateStore {
private final String name;
+ private final boolean persistent;
private boolean open = true;
public boolean initialized;
public boolean flushed;
public NoOpReadOnlyStore() {
- this("");
+ this("", true);
}
public NoOpReadOnlyStore(final String name) {
+ this(name, true);
+ }
+
+ public NoOpReadOnlyStore(final String name,
+ final boolean persistent) {
this.name = name;
+ this.persistent = persistent;
}
@Override
@@ -80,7 +87,7 @@ public class NoOpReadOnlyStore<K, V>
@Override
public boolean persistent() {
- return false;
+ return persistent;
}
@Override
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.