You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/04 11:33:31 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

vamossagar12 opened a new pull request #10052:
URL: https://github.com/apache/kafka/pull/10052


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577360887



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       Thanks @cadonna , @ableegoldman  for the detailed explanation. I understood the behaviour now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r570207387



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       Thank you for pointing this out! 
   I will have a look at this in the next days.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094199


   LGTM. Thanks for the contribution @vamossagar12 !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577363631



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       That sounds good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r570207387



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       Thank you for pointing this out! 
   I will have a look at this in the next days.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r575374620



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       @cadonna , I was thinking about this today and the question that I have is what exactly does a prefix scan mean for numeric keys. For the way we have implemented, it will work even for numeric keys if we can retrieve the keys in Lexicographical manner. 
   So, if the prefix is `1` and post increment, it becomes `2`, then the range in which I need to search becomes (1,2]. If my map has 1,2,11,13 then I should be able to fetch 1,11,13 as those come before 2 lexicographically. But when I test the underlying map from my test case, it returns 1,2,11,13 and that is why the range is failing, What are your thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-781994641


   @cadonna , As discussed, i have added the test cases by adding store with key as Bytes. Also, have added the warning with the example in the javadocs for prefixScan()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r570171267



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       @cadonna I found something interesting here. As you can see this creates a state store with Integer prefix. Now when I run prefixScan, I get only 1 from the iterator even though we have 11 and 13 as the keys. This is because in prefixScan, we do this: `final Bytes to = Bytes.increment(from);` which sets `to` to 2.
   Because of that, it doesn't return all the keys. 
   Question is, with prefix Scan we are primarily focussing on lexicographically next key which has the same prefix. That way, 11 should be returned. But with the way it has been implemented for this store, it won't work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577140223



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       Yeah, the underlying store compares the serializer bytes lexicographically, it doesn't have any concept of "Integer" or any other type. And the really tricky thing is that it scans lexicographically, which means from left to right, whereas when we serialize things we usually do so from right to left. eg `2` in binary is `10` whereas 11 in binary is `1011` and 13 is `1101`. 
   The problem here is that the serialized version of 2 is a different number of bytes than the serialized form of 11/13, so the lexicographical comparator is effectively comparing digits of a different magnitude. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r575374620



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       @cadonna , I was thinking about this today and the question that I have is what exactly does a prefix scan mean for numeric keys. For the way we have implemented, it will work even for numeric keys if we can retrieve the keys in Lexicographical manner. 
   So, if the prefix is `1` and post increment, it becomes `2`, then the range in which I need to search becomes (1,2]. If my map has 1,2,11,13 then I should be able to fetch 1,11,13 as those come before 2 lexicographically. But when I test the underlying map from my test case, it returns 1,2,11,13 and that is why the range is failing. 
   
   In the test class, the KV store being created has Integer keys, but the underlying map still uses a lexicographically map. So, ideally, it should have retured keys lexicographically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-785663227


   Thanks @cadonna , i have committed your suggestions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-785752484


   Call for committer approval: @guozhangwang @ableegoldman @mjsax @vvcephei @abbccdda 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r570171267



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       @cadonna I found something interesting here. As you can see this creates a state store with Integer prefix. Now when I run prefixScan, I get only 1 from the iterator even though we have 11 and 13 as the keys. This is because in prefixScan, we do this: `final Bytes to = Bytes.increment(from);` which sets `to` to 2.
   Because of that, it doesn't return all the keys. 
   Question is, with prefix Scan we are primarily focussing on lexicographically next key which has the same prefix. That way, 11 should be returned. But with the way it has been implemented for this store, it won't work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10052:
URL: https://github.com/apache/kafka/pull/10052


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r581735306



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +97,131 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_3")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k2")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_2")),
+            stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "k3")),
+            stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "prefix_1")),
+            stringSerializer.serialize(null, "f")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("prefix", stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+        assertThat(numberOfKeysReturned, is(3));
+        assertThat(valuesWithPrefix.get(0), is("f"));
+        assertThat(valuesWithPrefix.get(1), is("d"));
+        assertThat(valuesWithPrefix.get(2), is("b"));
+    }
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abc")),
+            stringSerializer.serialize(null, "f")));
+
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abcd")),
+            stringSerializer.serialize(null, "f")));
+
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "abce")),
+            stringSerializer.serialize(null, "f")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer);
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefixAsabcd.hasNext()) {
+            keysWithPrefixAsabcd.next().key.get();
+            numberOfKeysReturned++;
+        }
+
+        assertThat(numberOfKeysReturned, is(1));
+    }
+
+    @Test
+    public void shouldReturnUUIDsWithStringPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
+        final String prefix = uuid1.toString().substring(0, 4);
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid1)),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(uuidSerializer.serialize(null, uuid2)),
+            stringSerializer.serialize(null, "b")));
+
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan(prefix, stringSerializer);
+        final List<String> valuesWithPrefix = new ArrayList<>();
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix.add(new String(next.value));
+            numberOfKeysReturned++;
+        }
+
+        assertThat(numberOfKeysReturned, is(1));
+        assertThat(valuesWithPrefix.get(0), is("a"));
+    }
+
+    @Test
+    public void shouldReturnNoKeys() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "a")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "b")),
+            stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "c")),
+            stringSerializer.serialize(null, "e")));
+        byteStore.putAll(entries);
+        byteStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("d", stringSerializer);

Review comment:
       Here, I would use `bb` as the prefix to cover the case where a key is a prefix of the prefix.  
   
   ```suggestion
           final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("bb", stringSerializer);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -104,6 +104,9 @@
      * prefix into the format in which the keys are stored in the stores needs to be passed to this method.
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
      * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     * For example, if the key type is Integer, and the store contains keys [1, 2, 11, 13],
+     * then running store.prefixScan(1, new IntegerSerializer()) would return [1] and not [1,11,13].

Review comment:
       ```suggestion
        * Since {@code prefixScan()} relies on byte lexicographical ordering and not on the ordering of the key type, results for some types might be unexpected.
        * For example, if the key type is {@code Integer}, and the store contains keys [1, 2, 11, 13],
        * then running {@code store.prefixScan(1, new IntegerSerializer())} will return [1] and not [1,11,13]. 
        * In contrast, if the key type is {@code String} the keys will be sorted [1, 11, 13, 2] in the store and {@code store.prefixScan(1, new StringSerializer())} will return [1,11,13]. 
        * In both cases {@code prefixScan()} starts the scan at 1 and stops at 2.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r575662025



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       I spent some more time on this today. What seems to be happening here is that in InMemoryKVStore, even though we are creating a NavigableMap as the inner map and even though Bytes(Key type for the inner map) does implement Comparable, when I insert the following keys:
   
   ```final List<KeyValue<Integer, String>> entries = new ArrayList<>();
           entries.add(new KeyValue<>(1, value));
           entries.add(new KeyValue<>(2, value));
           entries.add(new KeyValue<>(11, value));
           entries.add(new KeyValue<>(13, value));
   store.putAll(entries);
   ```
   so the expectation(atleast for me) was that i should get the keys stored lexicographically ie the output of 
   
   ```
   final KeyValueIterator<Integer, String> allKeys = store.all();
           while (allKeys.hasNext()){
               System.out.println(allKeys.next().key);
           }
   ``` 
   should be `1 11 13 2` but what gets returned is `1 2 11 13` i.e the order in which we inserted. 
   
   Just to strengthen the point, when I insert keys in the order:
   ```final List<KeyValue<Integer, String>> entries = new ArrayList<>();
           entries.add(new KeyValue<>(1, value));
           entries.add(new KeyValue<>(11, value));
           entries.add(new KeyValue<>(13, value));
           entries.add(new KeyValue<>(2, value));
   
           store.putAll(entries);
   ```
   and when I fetch all keys, what I get back is still `1 2 11 13` i.e the natural ordering of the integer keys. And this is precisely the reason why prefix scan is returning only 1 key in the test case that I had pointed out. I did try to pass in an explicit comparator to the TreeMap but that didn't help as well. What are your thoughts on this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577363103



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       > The reason, we get only `1` when we scan for prefix `1` is that the integer serializer serializes `11` and `13` in the least significant byte instead of serializing `1` in the byte before the least significant byte and `1` and `3` in the least significant byte. With the former the **byte** lexicographical order of `1 2 11 13` would be `1 2 11 13` which corresponds to the natural order of integers. With the latter the **byte** lexicographical order of `1 2 11 13` would be `1 11 13 2` which corresponds to the string lexicographical order. So the serializer determines the order of the entries and the store always returns the entries in byte lexicographical order.
   > 
   > You will experience a similar when you call `range(-1, 2)` on the in-memory state store in the unit test. You will get back an empty result since `-1` is larger then `2` in byte lexicographical order
   > when the `IntegerSerializer` is used. Also not the warning that is output, especially this part `... or serdes that don't preserve ordering when lexicographically comparing the serialized bytes ...`
   > 
   > I think we should clearly state this limitation in the javadocs of the `prefixScan()` as we have done for `range()`, maybe with an example.
   > 
   > Currently, to get `prefixScan()` working for all types, we would need to do a complete scan (i.e. `all()`) followed by a filter, right?
   
   That is correct. That is the only way currently. 
   
   > 
   > Double checking: Is my understanding correct? @ableegoldman
   
   I think adding a warning similar to the range() query would be good. I will do that as part of the PR. However, in this test class, adding test cases for the integer serializer won't make sense. Probably I will create another KVStore and add tests for those. Is that ok, @cadonna ?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094844


   Cherry-picked to 2.8 as well cc @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r576789945



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       The reason, we get only `1` when we scan for prefix `1` is that the integer serializer serializes `11` and `13` in the least significant byte instead of serializing `1` in the byte before the least significant byte and `1` and `3` in the least significant byte. With the former the **byte** lexicographical order of `1 2 11 13` would be `1 2 11 13` which corresponds to the natural order of integers. With the latter the **byte** lexicographical order of `1 2 11 13` would be `1 11 13 2` which corresponds to the string lexicographical order. So the serializer determines the order of the entries and the store always returns the entries in byte lexicographical order.
   
   You will experience a similar when you call `range(-1, 2)` on the in-memory state store in the unit test. You will get back an empty result since `-1` is larger then `2` in byte lexicographical order   
    when the `IntegerSerializer` is used. Also not the warning that is output, especially this part `... or serdes that don't preserve ordering when lexicographically comparing the serialized bytes ...`
    
   I think we should clearly state this limitation in the javadocs of the `prefixScan()` as we have done for `range()`, maybe with an example. 
   
   Currently, to get `prefixScan()` working for all types, we would need to do a complete scan (i.e. `all()`) followed by a filter, right? 
   
   Double checking: Is my understanding correct? @ableegoldman 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r575374620



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##########
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
         assertThat(store.get(0), nullValue());
     }
+
+
+    @Test
+    public void shouldReturnKeysWithGivenPrefix(){
+        store = createKeyValueStore(driver.context());
+        final String value = "value";
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, value));
+        entries.add(new KeyValue<>(2, value));
+        entries.add(new KeyValue<>(11, value));
+        entries.add(new KeyValue<>(13, value));
+
+        store.putAll(entries);
+        final KeyValueIterator<Integer, String> keysWithPrefix = store.prefixScan(1, new IntegerSerializer());

Review comment:
       @cadonna , I was thinking about this today and the question that I have is what exactly does a prefix scan mean for numeric keys. For the way we have implemented, it will work even for numeric keys if we can retrieve the keys in Lexicographical manner. 
   So, if the prefix is `1` and post increment, it becomes `2`, then the range in which I need to search becomes (1,2]. If my map has 1,2,11,13 then I should be able to fetch 1,11,13 as those come before 2 lexicographically. But when I test the underlying map from my test case, it returns 1,2,11,13 and that is why the range is failing. 
   
   
   I guess the real problem lies in the fact that in InMemoryKeyValueStoreTest, the KeyValueStore defined uses Integer as the key. But if somebody creates a store with integer keys and tries to call the prefixScan API, then it might fail. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org