You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/02 00:14:26 UTC

[kafka] branch trunk updated: KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0132c3  KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)
b0132c3 is described below

commit b0132c31e91bb090ff012c726ac4ed92566ce7fb
Author: bartdevylder <ba...@gmail.com>
AuthorDate: Fri Feb 2 01:14:22 2018 +0100

    KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)
    
    The ChangeLoggingKeyValueBytesStore used to write null to its underlying store instead of propagating the delete, which has two drawbacks:
    
    * an iterator will see null values
    * unbounded memory growth of the underlying in-memory keyvalue store
    
    The fix will just propagate the delete instead of performing put(key, null).
    
    The changes to the tests:
    
    *extra test whether the key is really gone after delete by calling the approximateEntries on the underlying store. This number is exact because we know the underlying store in the test is of type InMemoryKeyValueStore
    * extra test to check a delete is logged as <key, null> (the existing test would also succeed if the key is just absent)
    
    While also updating the corresponding tests of the ChangeLoggingKeyValueStore I noticed the class is nowhere used anymore so I removed it from the source code for clarity.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../internals/ChangeLoggingKeyValueBytesStore.java |   4 +-
 .../internals/ChangeLoggingKeyValueStore.java      | 123 -----------
 .../state/internals/AbstractKeyValueStoreTest.java |  47 ++---
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   4 +-
 .../internals/ChangeLoggingKeyValueStoreTest.java  | 225 ---------------------
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  28 ++-
 6 files changed, 47 insertions(+), 384 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 8dc457a..94ee275 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -77,8 +77,8 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
 
     @Override
     public byte[] delete(final Bytes key) {
-        final byte[] oldValue = inner.get(key);
-        put(key, null);
+        final byte[] oldValue = inner.delete(key);
+        changeLogger.logChange(key, null);
         return oldValue;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
deleted file mode 100644
index ea9f7aa..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-    private final ChangeLoggingKeyValueBytesStore innerBytes;
-    private final Serde keySerde;
-    private final Serde valueSerde;
-    private StateSerdes<K, V> serdes;
-
-
-    ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore,
-                               final Serde keySerde,
-                               final Serde valueSerde) {
-        this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, valueSerde);
-    }
-
-    private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore,
-                                       final Serde keySerde,
-                                       final Serde valueSerde) {
-        super(bytesStore);
-        this.innerBytes = bytesStore;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        innerBytes.init(context, root);
-
-        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()),
-                                   keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                   valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return innerBytes.approximateNumEntries();
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
-        final byte[] bytesValue = serdes.rawValue(value);
-        innerBytes.put(bytesKey, bytesValue);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        final V v = get(key);
-        if (v == null) {
-            put(key, value);
-        }
-        return v;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
-        for (final KeyValue<K, V> entry : entries) {
-            keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
-        }
-        innerBytes.putAll(keyValues);
-    }
-
-    @Override
-    public V delete(final K key) {
-        final byte[] oldValue = innerBytes.delete(Bytes.wrap(serdes.rawKey(key)));
-        if (oldValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(oldValue);
-    }
-
-    @Override
-    public V get(final K key) {
-        final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key)));
-        if (rawValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(rawValue);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
-                                                                 Bytes.wrap(serdes.rawKey(to))),
-                                                                 serdes);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 65a9dec..398c4c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -27,14 +27,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
                                                                       Class<K> keyClass, Class<V> valueClass,
                                                                       boolean useContextSerdes);
@@ -58,6 +58,15 @@ public abstract class AbstractKeyValueStoreTest {
         driver.clear();
     }
 
+    private static Map<Integer, String> getContents(final KeyValueIterator<Integer, String> iter) {
+        final HashMap<Integer, String> result = new HashMap<>();
+        while (iter.hasNext()) {
+            KeyValue<Integer, String> entry = iter.next();
+            result.put(entry.key, entry.value);
+        }
+        return result;
+    }
+
     @Test
     public void testPutGetRange() {
         // Verify that the store reads and writes correctly ...
@@ -74,6 +83,7 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals("four", store.get(4));
         assertEquals("five", store.get(5));
         store.delete(5);
+        assertEquals(4, driver.sizeOf(store));
 
         // Flush the store and verify all current entries were properly flushed ...
         store.flush();
@@ -89,31 +99,18 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals(false, driver.flushedEntryRemoved(4));
         assertEquals(true, driver.flushedEntryRemoved(5));
 
-        // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        final HashMap<Integer, String> expectedContents = new HashMap<>();
+        expectedContents.put(2, "two");
+        expectedContents.put(4, "four");
 
         // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        assertEquals(expectedContents, getContents(store.range(2, 4)));
+        assertEquals(expectedContents, getContents(store.range(2, 6)));
+
+        // Check all iteration ...
+        expectedContents.put(0, "zero");
+        expectedContents.put(1, "one");
+        assertEquals(expectedContents, getContents(store.all()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index cf07927..9360dae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -109,9 +109,10 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     }
 
     @Test
-    public void shouldPutNullOnDelete() {
+    public void shouldPropagateDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(inner.approximateNumEntries(), equalTo(0L));
         assertThat(inner.get(hi), nullValue());
     }
 
@@ -125,6 +126,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     public void shouldLogKeyNullOnDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(sent.containsKey(hi), is(true));
         assertThat(sent.get(hi), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
deleted file mode 100644
index 8190fd2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-
-public class ChangeLoggingKeyValueStoreTest {
-
-    private MockProcessorContext context;
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
-    private final Serde<String> keySerde = Serdes.String();
-    private final Serde<String> valueSerde = Serdes.String();
-    private final ChangeLoggingKeyValueStore<String, String> store
-            = new ChangeLoggingKeyValueStore<>(inner, keySerde, valueSerde);
-    private final Map sent = new HashMap<>();
-    private final String hi = "hi";
-    private final Bytes hiBytes = Bytes.wrap(hi.getBytes());
-    private final String there = "there";
-    private final byte[] thereBytes = "there".getBytes();
-    private final String hello = "hello";
-    private final String world = "world";
-
-    @Before
-    public void before() {
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                sent.put(key, value);
-            }
-        };
-        context = new MockProcessorContext(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.Long(),
-            collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init(context, store);
-    }
-
-    @After
-    public void after() {
-        context.close();
-        store.close();
-    }
-
-    @Test
-    public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
-        store.put(hi, there);
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangeOnPut() {
-        store.put(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hello, world),
-                                   KeyValue.pair(hi, there)));
-        assertThat(deserializedValueFromInner(hello), equalTo(world));
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangesOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
-                                   KeyValue.pair(hello, world)));
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-        assertThat((byte[]) sent.get(Bytes.wrap(hello.getBytes())), equalTo(world.getBytes()));
-    }
-
-    @Test
-    public void shouldPutNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(inner.get(hiBytes), nullValue());
-    }
-
-    @Test
-    public void shouldReturnOldValueOnDelete() {
-        store.put(hi, there);
-        assertThat(store.delete(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnDeleteIfNoOldValue() {
-        assertThat(store.delete(hi), is(nullValue()));
-    }
-
-    @Test
-    public void shouldLogKeyNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(sent.get(hi), nullValue());
-    }
-
-    @Test
-    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldReturnCurrentValueOnPutIfAbsent() {
-        store.put(hi, there);
-        assertThat(store.putIfAbsent(hi, world), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
-        assertThat(store.putIfAbsent(hi, there), is(nullValue()));
-    }
-
-    @Test
-    public void shouldQueryRange() {
-        store.put(hello, world);
-        store.put(hi, there);
-        store.put("zooom", "home");
-        final KeyValueIterator<String, String> range = store.range(hello, hi);
-        assertThat(range.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(range.next(), equalTo(KeyValue.pair(hi, there)));
-        assertFalse(range.hasNext());
-    }
-
-    @Test
-    public void shouldReturnAllKeyValues() {
-        store.put(hello, world);
-        store.put(hi, there);
-        final String zooom = "zooom";
-        final String home = "home";
-        store.put(zooom, home);
-        final KeyValueIterator<String, String> all = store.all();
-        assertThat(all.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(all.next(), equalTo(KeyValue.pair(hi, there)));
-        assertThat(all.next(), equalTo(KeyValue.pair(zooom, home)));
-        assertFalse(all.hasNext());
-    }
-
-    @Test
-    public void shouldReturnValueOnGetWhenExists() {
-        store.put(hello, world);
-        assertThat(store.get(hello), equalTo(world));
-    }
-
-    @Test
-    public void shouldReturnNullOnGetWhenDoesntExist() {
-        assertThat(store.get(hello), is(nullValue()));
-    }
-
-    @Test
-    public void shouldReturnInnerStoreName() {
-        assertThat(store.name(), equalTo("kv"));
-    }
-
-    private String deserializedValueFromInner(final String key) {
-        return valueSerde.deserializer().deserialize("blah", inner.get(Bytes.wrap(key.getBytes())));
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 3607c9e..d24a90f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -39,18 +42,27 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
             Class<V> valueClass,
             boolean useContextSerdes) {
 
-        StateStoreSupplier supplier;
+        final Serde<K> keySerde;
+        final Serde<V> valueSerde;
+
         if (useContextSerdes) {
-            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde())
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = (Serde<K>) context.keySerde();
+            valueSerde = (Serde<V>) context.valueSerde();
         } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass)
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = Serdes.serdeFrom(keyClass);
+            valueSerde = Serdes.serdeFrom(valueClass);
         }
 
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-store"),
+                keySerde,
+                valueSerde)
+                .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
+
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+
+        return (KeyValueStore<K, V>) store;
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.