You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:52:29 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

vamossagar12 opened a new pull request #10877:
URL: https://github.com/apache/kafka/pull/10877


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-880336482


   Merged to trunk and cherrypicked to 3.0 and 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-880255698


   > I have added a test for IQ. Some reason, I wasn't able to run the test on my local as I was getting a build failure due to scala test classes. I would watch out for the status of the tests here.
   
   Apparently there was an actual issue with a scala class on trunk, if you pull/rebase you should be able to build locally again. Looks like the test you added did pass though, so nice 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-879869625


   > @vamossagar12 looks like there's a checkstyle failure that's preventing the tests from running, can you fix that as well? Giving this a pass now
   
   Oh.. sorry about that I believe the checkStyle error should go away now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r651526827



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,22 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        Objects.requireNonNull(prefix);

Review comment:
       as above (similar elsewhere)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r670001461



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
##########
@@ -845,6 +845,64 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception {
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
         }
+
+        final KeyValueIterator<String, Long> range = myMapStore.range("hello", "kafka");
+        while (range.hasNext()) {
+            System.out.println(range.next());
+        }
+    }
+
+    @Test
+    public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
+
+        final List<KeyValue<String, Long>> expectedPrefixScanResult = Arrays.asList(
+            new KeyValue<>(keys[3], 5L),
+            new KeyValue<>(keys[1], 1L)
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final KTable<String, String> t1 = builder.table(streamOne);
+        t1
+            .mapValues(
+                (ValueMapper<String, Long>) Long::valueOf,
+                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
+
+        final ReadOnlyKeyValueStore<String, Long> myMapStore =
+            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore());
+
+        int index = 0;
+        final KeyValueIterator<String, Long> range = myMapStore.prefixScan("go", Serdes.String().serializer());
+        while (range.hasNext()) {

Review comment:
       I know this is just how the other tests are doing it, but it's not really an airtight way to validate the expected results...if nothing is returned then we never enter the `while` loop and the test passes, even if we did in fact expect there to be actual output.
   
   The important thing here was just to make sure it didn't throw an exception so it still does that, but it would be good to fix this up maybe in a followup PR

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       1) Could you just treat them as Bytes all the same, and just convert to/from an Integer before putting/getting them from the store? That way you're still just handling Bytes like you are in this test, it just goes through an extra layer of de/serialization. Should be able to more or less copy over the existing tests with just a bit of extra code. Can you try this, in a followup PR?
   2) Yes, I was just suggesting to merge them as a possible way to make things easier and do less work, if it's going to be more then please do file a separate ticket for it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       Ah, I see, I thought it was trying to test the underlying `prefixScan` functionality itself, but what Matthias said makes sense. They do still feel very out of place here, but I'm not sure I can think of a better place to put them. Maybe just create a new file and put them in a separate test class of their own? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10877:
URL: https://github.com/apache/kafka/pull/10877


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-877398722


   Thanks @ableegoldman  i have fixed the merge conflict. Plz review whenever you get the chance...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-878765736


   @vamossagar12 looks like there's a checkstyle failure that's preventing the tests from running, can you fix that as well? Giving this a pass now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r651526170



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -106,6 +108,8 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+        Objects.requireNonNull(prefix);
+        Objects.requireNonNull(prefixKeySerializer);

Review comment:
       Should we not check for `null` in the outer `MeteredXxxStore`? Should be redundant to check here for a second time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r669536336



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       Actually, @mjsax  had asked me to test all possible flavours of the state stores here in this comment:
   https://github.com/apache/kafka/pull/10877#pullrequestreview-683669139
   I thought this would be the right place to do that. Let me know if it isn't..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-876104606


   @vamossagar12 it looks like there are now merge conflicts, can you fix those? I'll try to have a look soon, hopefully @mjsax can as well -- also, can you confirm whether this is/should be a blocker for 3.0 or not?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-864382529


   @mjsax , have added the tests for the combinations that you have mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r668417652



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       Can you add a test or two for this in `InMemoryLRUCacheStoreTest`? Alternatively, I think you should be able to just collect all the tests in `InMemoryKeyValueStoreTest` and `CachingInMemoryKeyValueStoreTest` and move them over to
   `AbstractKeyValueStoreTest` instead. That way you get test coverage for both of those plus a handful of other store classes at once, without having to copy the same test over and over across a bunch of different files.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -26,13 +26,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
+import java.util.List;

Review comment:
       Can you revert the changes in this file?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingDisabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingDisabled();
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
+        final String storeName = "prefixScanStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), Serdes.String())
+                .withCachingEnabled()
+                .withLoggingEnabled(Collections.emptyMap());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor1");
+
+        driver = new TopologyTestDriver(topology, props);
+
+        final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
+        final TestOutputTopic<Integer, String> outputTopic1 =
+            driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
+
+        inputTopic.pipeInput("key1", "value1");
+        inputTopic.pipeInput("key2", "value2");
+        inputTopic.pipeInput("key3", "value3");
+        inputTopic.pipeInput("key1", "value4");
+        assertTrue(outputTopic1.isEmpty());
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("prefixScanStore");
+        final KeyValueIterator<String, String> prefixScan = store.prefixScan("key", Serdes.String().serializer());
+        final List<KeyValue<String, String>> results = new ArrayList<>();
+        while (prefixScan.hasNext()) {
+            final KeyValue<String, String> next = prefixScan.next();
+            results.add(next);
+        }
+
+        assertEquals("key1", results.get(0).key);
+        assertEquals("value4", results.get(0).value);
+        assertEquals("key2", results.get(1).key);
+        assertEquals("value2", results.get(1).value);
+        assertEquals("key3", results.get(2).key);
+        assertEquals("value3", results.get(2).value);
+
+    }
+
+    @Test
+    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {

Review comment:
       I'm not sure this is really the right place to test the `prefixScan` functionality for all of these different store types, this test class is really more for making sure the topology itself is all wired up correctly. If you're just trying to test a method on a specific store type, that generally makes sense to do in the test class for that store itself. In other words you don't need to have a separate test here for each underlying store type (eg `PersistentTimestampedStore` or `LruMap`, etc), there are dedicated test classes for that (like `RocksDBTimestampedStoreTest` or `InMemoryLRUCacheStoreTest`)
   
   That said, it sounds like the original bug report uncovered the missing implementations "when accessing the state stores through the processor context" -- which does sound like it could/would be reproduced through a test here. Maybe you can just pick a store type and write a single test that reproduces the issue when run without this patch, and I would consider that sufficient for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-879870668


   > I think this looks pretty good (as far as I can tell), but the important thing is going to be the tests, since it's hard to verify just by looking whether we hit every place this is needed. One thing I was wondering was whether we have any tests for this with IQ (Interactive Queries) -- if not, an integration test that tries to run `prefixScan` using IQ should definitely be added
   
   I have added a test for IQ. Some reason, I wasn't able to run the test on my local as I was getting a build failure due to scala test classes. I would watch out for the status of the tests here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-876082544


   @mjsax , @ableegoldman  did you get a chance to look at this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r651526827



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,22 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        Objects.requireNonNull(prefix);

Review comment:
       as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r669539634



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##########
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
                 .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap));
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       1) I guess the reason I didn't add test in `InMemoryLRUCacheStoreTest` was that the key type defined here is Integer and prefixScan like range/reverseRange relies on byte lexicographical ordering and not on the ordering of the key type. So, IMO this test won't yield much for this case. If needed, I can add it.But, I think the tests for reverse and reverseRange are also missing from this class for the same reason or it could just be a miss :) 
   2) Regarding the merging of test classes into one, I see that both those classes override the `createKeyValueStore` method differently, so merging them as per me would need some analysis. Is it ok if we create a separate ticket and tackle it there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org