You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/13 04:55:15 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       Can you add a test or two for this in `InMemoryLRUCacheStoreTest`? Alternatively, I think you should be able to just collect all the tests in `InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move them over to
   `AbstractKeyValueStoreTest` instead. That way you get test coverage for both of those plus a handful of other store classes at once, without having to copy the same test over and over across a bunch of different files.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -26,13 +26,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
+import java.util.List;

Review comment:
       Can you revert the changes in this file?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       I'm not sure this is really the right place to test the `prefixScan` functionality for all of these different store types, this test class is really more for making sure the topology itself is all wired up correctly. If you're just trying to test a method on a specific store type, that generally makes sense to do in the test class for that store itself. In other words you don't need to have a separate test here for each underlying store type (eg `PersistentTimestampedStore` or `LruMap`, etc), there are dedicated test classes for that (like `RocksDBTimestampedStoreTest` or `InMemoryLRUCacheStoreTest`)
   
   That said, it sounds like the original bug report uncovered the missing implementations "when accessing the state stores through the processor context" -- which does sound like it could/would be reproduced through a test here. Maybe you can just pick a store type and write a single test that reproduces the issue when run without this patch, and I would consider that sufficient for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org