You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/26 00:21:21 UTC
kafka git commit: MINOR: initialize Serdes with ProcessorContext
Repository: kafka
Updated Branches:
refs/heads/trunk 69a1cced4 -> 5e8958a85
MINOR: initialize Serdes with ProcessorContext
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #589 from ymatsuda/init_serdes_with_procctx
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e8958a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e8958a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e8958a8
Branch: refs/heads/trunk
Commit: 5e8958a856a5b4ccbdcb610473cab4c2eeddbac5
Parents: 69a1cce
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Nov 25 15:21:17 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 25 15:21:17 2015 -0800
----------------------------------------------------------------------
.../kafka/streams/examples/ProcessorJob.java | 2 +-
.../streams/state/MeteredKeyValueStore.java | 1 +
.../state/RocksDBKeyValueStoreSupplier.java | 2 +
.../org/apache/kafka/streams/state/Serdes.java | 61 +++++++-------------
.../org/apache/kafka/streams/state/Stores.java | 5 +-
.../internals/ProcessorTopologyTest.java | 2 +-
.../state/AbstractKeyValueStoreTest.java | 12 ++--
.../state/InMemoryKeyValueStoreTest.java | 6 +-
.../state/InMemoryLRUCacheStoreTest.java | 7 ++-
.../streams/state/KeyValueStoreTestDriver.java | 9 ---
.../streams/state/RocksDBKeyValueStoreTest.java | 6 +-
11 files changed, 40 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 3274aae..882c7ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -104,7 +104,7 @@ public class ProcessorJob {
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
- builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+ builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
builder.connectProcessorAndStateStores("local-state", "PROCESS");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index c1ccbd4..b68f763 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
+ serialization.init(context);
this.context = context;
this.partition = context.id().partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index fe8f00a..f1fbd9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -118,6 +118,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public void init(ProcessorContext context) {
+ serdes.init(context);
+
this.context = context;
this.partition = context.id().partition;
this.dbName = this.topic + "." + this.partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 31bd439..f41d928 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
final class Serdes<K, V> {
@@ -57,19 +57,21 @@ final class Serdes<K, V> {
}
private final String topic;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
+ private Serializer<K> keySerializer;
+ private Serializer<V> valueSerializer;
+ private Deserializer<K> keyDeserializer;
+ private Deserializer<V> valueDeserializer;
/**
- * Create a context for serialization using the specified serializers and deserializers.
+ * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
+ * corresponding {@link ProcessorContext}'s serializer or deserializer, which
+ * <em>must</em> match the key and value types used as parameters for this object.
*
* @param topic the name of the topic
- * @param keySerializer the serializer for keys; may not be null
- * @param keyDeserializer the deserializer for keys; may not be null
- * @param valueSerializer the serializer for values; may not be null
- * @param valueDeserializer the deserializer for values; may not be null
+ * @param keySerializer the serializer for keys; may be null
+ * @param keyDeserializer the deserializer for keys; may be null
+ * @param valueSerializer the serializer for values; may be null
+ * @param valueDeserializer the deserializer for values; may be null
*/
public Serdes(String topic,
Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
@@ -82,45 +84,22 @@ final class Serdes<K, V> {
}
/**
- * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
- * corresponding {@link StreamingConfig}'s serializer or deserializer, which
+ * Create a context for serialization using the {@link ProcessorContext}'s serializers and deserializers, which
* <em>must</em> match the key and value types used as parameters for this object.
*
* @param topic the name of the topic
- * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
- * key serializer} should be used
- * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
- * key deserializer} should be used
- * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
- * value serializer} should be used
- * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
- * default value deserializer} should be used
- * @param config the streaming config
*/
@SuppressWarnings("unchecked")
- public Serdes(String topic,
- Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
- Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
- StreamingConfig config) {
- this.topic = topic;
-
- this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
- this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
- this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
- this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
+ public Serdes(String topic) {
+ this(topic, null, null, null, null);
}
- /**
- * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
- * <em>must</em> match the key and value types used as parameters for this object.
- *
- * @param topic the name of the topic
- * @param config the streaming config
- */
@SuppressWarnings("unchecked")
- public Serdes(String topic,
- StreamingConfig config) {
- this(topic, null, null, null, null, config);
+ public void init(ProcessorContext context) {
+ keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
+ keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
+ valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
+ valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
}
public Deserializer<K> keyDeserializer() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c5f040f..5452040 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
/**
@@ -40,7 +39,7 @@ public class Stores {
* @param name the name of the store
* @return the factory that can be used to specify other options or configurations for the store; never null
*/
- public static StoreFactory create(final String name, final StreamingConfig config) {
+ public static StoreFactory create(final String name) {
return new StoreFactory() {
@Override
public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
@@ -49,7 +48,7 @@ public class Stores {
public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) {
final Serdes<K, V> serdes =
- new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config);
+ new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
return new KeyValueFactory<K, V>() {
@Override
public InMemoryKeyValueFactory<K, V> inMemory() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 54096b2..2f359bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -204,7 +204,7 @@ public class ProcessorTopologyTest {
return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
.addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
.addStateStore(
- Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
+ Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
"processor"
)
.addSink("counts", OUTPUT_TOPIC_1, "processor");
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index 209f3c9..d40f308 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,14 +21,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Test;
public abstract class AbstractKeyValueStoreTest {
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
- ProcessorContext context,
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
Class<K> keyClass, Class<V> valueClass,
boolean useContextSerdes);
@@ -36,7 +34,7 @@ public abstract class AbstractKeyValueStoreTest {
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
try {
// Verify that the store reads and writes correctly ...
@@ -102,7 +100,7 @@ public abstract class AbstractKeyValueStoreTest {
public void testPutGetRangeWithDefaultSerdes() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
try {
// Verify that the store reads and writes correctly ...
@@ -152,7 +150,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
try {
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
@@ -178,7 +176,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
try {
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index b3fe98c..2b90d0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -27,7 +26,6 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- StreamingConfig config,
ProcessorContext context,
Class<K> keyClass, Class<V> valueClass,
boolean useContextSerdes) {
@@ -38,9 +36,9 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+ supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
} else {
- supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
+ supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
}
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index dddb9c7..81adfad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -26,11 +26,12 @@ import org.junit.Test;
public class InMemoryLRUCacheStoreTest {
+ @SuppressWarnings("unchecked")
@Test
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+ StateStoreSupplier supplier = Stores.create("my-store")
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
@@ -82,7 +83,7 @@ public class InMemoryLRUCacheStoreTest {
Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+ StateStoreSupplier supplier = Stores.create("my-store")
.withKeys(keySer, keyDeser)
.withValues(valSer, valDeser)
.inMemory().maxEntries(3)
@@ -138,7 +139,7 @@ public class InMemoryLRUCacheStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+ StateStoreSupplier supplier = Stores.create("my-store")
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 8bab1c9..28cc3af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,15 +362,6 @@ public class KeyValueStoreTestDriver<K, V> {
}
/**
- * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
- *
- * @return the streaming config; never null
- */
- public StreamingConfig config() {
- return config;
- }
-
- /**
* Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
* written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
* {@link #flushedEntryRemoved(Object)} methods.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 37a12f9..20e92ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -27,7 +26,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- StreamingConfig config,
ProcessorContext context,
Class<K> keyClass,
Class<V> valueClass,
@@ -39,9 +37,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+ supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
} else {
- supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+ supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
}
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();