You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/02 00:15:00 UTC

[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

    [ https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349560#comment-16349560 ] 

ASF GitHub Bot commented on KAFKA-6487:
---------------------------------------

guozhangwang closed pull request #4495: KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete
URL: https://github.com/apache/kafka/pull/4495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 8dc457a9949..94ee275a3bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -77,8 +77,8 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
 
     @Override
     public byte[] delete(final Bytes key) {
-        final byte[] oldValue = inner.get(key);
-        put(key, null);
+        final byte[] oldValue = inner.delete(key);
+        changeLogger.logChange(key, null);
         return oldValue;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
deleted file mode 100644
index ea9f7aa8713..00000000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-    private final ChangeLoggingKeyValueBytesStore innerBytes;
-    private final Serde keySerde;
-    private final Serde valueSerde;
-    private StateSerdes<K, V> serdes;
-
-
-    ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore,
-                               final Serde keySerde,
-                               final Serde valueSerde) {
-        this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, valueSerde);
-    }
-
-    private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore,
-                                       final Serde keySerde,
-                                       final Serde valueSerde) {
-        super(bytesStore);
-        this.innerBytes = bytesStore;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        innerBytes.init(context, root);
-
-        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()),
-                                   keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                   valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return innerBytes.approximateNumEntries();
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
-        final byte[] bytesValue = serdes.rawValue(value);
-        innerBytes.put(bytesKey, bytesValue);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        final V v = get(key);
-        if (v == null) {
-            put(key, value);
-        }
-        return v;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
-        for (final KeyValue<K, V> entry : entries) {
-            keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
-        }
-        innerBytes.putAll(keyValues);
-    }
-
-    @Override
-    public V delete(final K key) {
-        final byte[] oldValue = innerBytes.delete(Bytes.wrap(serdes.rawKey(key)));
-        if (oldValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(oldValue);
-    }
-
-    @Override
-    public V get(final K key) {
-        final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key)));
-        if (rawValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(rawValue);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
-                                                                 Bytes.wrap(serdes.rawKey(to))),
-                                                                 serdes);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 65a9dec892b..398c4c5062c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -27,14 +27,14 @@
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
                                                                       Class<K> keyClass, Class<V> valueClass,
                                                                       boolean useContextSerdes);
@@ -58,6 +58,15 @@ public void after() {
         driver.clear();
     }
 
+    private static Map<Integer, String> getContents(final KeyValueIterator<Integer, String> iter) {
+        final HashMap<Integer, String> result = new HashMap<>();
+        while (iter.hasNext()) {
+            KeyValue<Integer, String> entry = iter.next();
+            result.put(entry.key, entry.value);
+        }
+        return result;
+    }
+
     @Test
     public void testPutGetRange() {
         // Verify that the store reads and writes correctly ...
@@ -74,6 +83,7 @@ public void testPutGetRange() {
         assertEquals("four", store.get(4));
         assertEquals("five", store.get(5));
         store.delete(5);
+        assertEquals(4, driver.sizeOf(store));
 
         // Flush the store and verify all current entries were properly flushed ...
         store.flush();
@@ -89,31 +99,18 @@ public void testPutGetRange() {
         assertEquals(false, driver.flushedEntryRemoved(4));
         assertEquals(true, driver.flushedEntryRemoved(5));
 
-        // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        final HashMap<Integer, String> expectedContents = new HashMap<>();
+        expectedContents.put(2, "two");
+        expectedContents.put(4, "four");
 
         // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        assertEquals(expectedContents, getContents(store.range(2, 4)));
+        assertEquals(expectedContents, getContents(store.range(2, 6)));
+
+        // Check all iteration ...
+        expectedContents.put(0, "zero");
+        expectedContents.put(1, "one");
+        assertEquals(expectedContents, getContents(store.all()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index cf0792712f6..9360daef272 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -109,9 +109,10 @@ public void shouldLogChangesOnPutAll() {
     }
 
     @Test
-    public void shouldPutNullOnDelete() {
+    public void shouldPropagateDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(inner.approximateNumEntries(), equalTo(0L));
         assertThat(inner.get(hi), nullValue());
     }
 
@@ -125,6 +126,7 @@ public void shouldReturnOldValueOnDelete() {
     public void shouldLogKeyNullOnDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(sent.containsKey(hi), is(true));
         assertThat(sent.get(hi), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
deleted file mode 100644
index 8190fd241f3..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-
-public class ChangeLoggingKeyValueStoreTest {
-
-    private MockProcessorContext context;
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
-    private final Serde<String> keySerde = Serdes.String();
-    private final Serde<String> valueSerde = Serdes.String();
-    private final ChangeLoggingKeyValueStore<String, String> store
-            = new ChangeLoggingKeyValueStore<>(inner, keySerde, valueSerde);
-    private final Map sent = new HashMap<>();
-    private final String hi = "hi";
-    private final Bytes hiBytes = Bytes.wrap(hi.getBytes());
-    private final String there = "there";
-    private final byte[] thereBytes = "there".getBytes();
-    private final String hello = "hello";
-    private final String world = "world";
-
-    @Before
-    public void before() {
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                sent.put(key, value);
-            }
-        };
-        context = new MockProcessorContext(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.Long(),
-            collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init(context, store);
-    }
-
-    @After
-    public void after() {
-        context.close();
-        store.close();
-    }
-
-    @Test
-    public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
-        store.put(hi, there);
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangeOnPut() {
-        store.put(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hello, world),
-                                   KeyValue.pair(hi, there)));
-        assertThat(deserializedValueFromInner(hello), equalTo(world));
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangesOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
-                                   KeyValue.pair(hello, world)));
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-        assertThat((byte[]) sent.get(Bytes.wrap(hello.getBytes())), equalTo(world.getBytes()));
-    }
-
-    @Test
-    public void shouldPutNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(inner.get(hiBytes), nullValue());
-    }
-
-    @Test
-    public void shouldReturnOldValueOnDelete() {
-        store.put(hi, there);
-        assertThat(store.delete(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnDeleteIfNoOldValue() {
-        assertThat(store.delete(hi), is(nullValue()));
-    }
-
-    @Test
-    public void shouldLogKeyNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(sent.get(hi), nullValue());
-    }
-
-    @Test
-    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldReturnCurrentValueOnPutIfAbsent() {
-        store.put(hi, there);
-        assertThat(store.putIfAbsent(hi, world), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
-        assertThat(store.putIfAbsent(hi, there), is(nullValue()));
-    }
-
-    @Test
-    public void shouldQueryRange() {
-        store.put(hello, world);
-        store.put(hi, there);
-        store.put("zooom", "home");
-        final KeyValueIterator<String, String> range = store.range(hello, hi);
-        assertThat(range.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(range.next(), equalTo(KeyValue.pair(hi, there)));
-        assertFalse(range.hasNext());
-    }
-
-    @Test
-    public void shouldReturnAllKeyValues() {
-        store.put(hello, world);
-        store.put(hi, there);
-        final String zooom = "zooom";
-        final String home = "home";
-        store.put(zooom, home);
-        final KeyValueIterator<String, String> all = store.all();
-        assertThat(all.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(all.next(), equalTo(KeyValue.pair(hi, there)));
-        assertThat(all.next(), equalTo(KeyValue.pair(zooom, home)));
-        assertFalse(all.hasNext());
-    }
-
-    @Test
-    public void shouldReturnValueOnGetWhenExists() {
-        store.put(hello, world);
-        assertThat(store.get(hello), equalTo(world));
-    }
-
-    @Test
-    public void shouldReturnNullOnGetWhenDoesntExist() {
-        assertThat(store.get(hello), is(nullValue()));
-    }
-
-    @Test
-    public void shouldReturnInnerStoreName() {
-        assertThat(store.name(), equalTo("kv"));
-    }
-
-    private String deserializedValueFromInner(final String key) {
-        return valueSerde.deserializer().deserialize("blah", inner.get(Bytes.wrap(key.getBytes())));
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 3607c9e1107..d24a90fd857 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -39,18 +42,27 @@
             Class<V> valueClass,
             boolean useContextSerdes) {
 
-        StateStoreSupplier supplier;
+        final Serde<K> keySerde;
+        final Serde<V> valueSerde;
+
         if (useContextSerdes) {
-            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde())
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = (Serde<K>) context.keySerde();
+            valueSerde = (Serde<V>) context.valueSerde();
         } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass)
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = Serdes.serdeFrom(keyClass);
+            valueSerde = Serdes.serdeFrom(valueClass);
         }
 
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-store"),
+                keySerde,
+                valueSerde)
+                .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
+
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+
+        return (KeyValueStore<K, V>) store;
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ChangeLoggingKeyValueBytesStore.all() returns null
> --------------------------------------------------
>
>                 Key: KAFKA-6487
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6487
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Bill Bejeck
>            Assignee: Bart De Vylder
>            Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc for {{ReadOnlyKeyValueStore#all}} states the method should never return a {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)