You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/06 14:03:08 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r518682502



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -107,4 +108,18 @@
      * @throws InvalidStateStoreException if the store is not initialized
      */
     long approximateNumEntries();
+
+    /**
+     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
+     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
+     * format in which the keys are stored underneath in the stores
+     * @param prefix The prefix.

Review comment:
       Please leave an empty line above to separate parameter description from the description of the method. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -107,4 +108,18 @@
      * @throws InvalidStateStoreException if the store is not initialized
      */
     long approximateNumEntries();
+
+    /**
+     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
+     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
+     * format in which the keys are stored underneath in the stores

Review comment:
       We usually put every sentence on its own line In javadocs. Unfortunately, we are not always consistent as you can see in the javadocs in this file.
   I guess this iterator has similar requirements as the other iterators, i.e., it must be closed, it must be safe from `java.util.ConcurrentModificationException`, and it must not return `null` values. If so, please add this requirements to the javadocs.
   ```suggestion
        * Return an iterator over all keys with the specified prefix. 
        * Since the type of the prefix can be different from that of the key, a serializer to convert the
        * prefix into the format in which the keys are stored in the stores needs to be passed to this method.  
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -97,6 +98,11 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+        return wrapped().prefixScan(prefix, prefixKeySerializer);
+    }

Review comment:
       ```suggestion
       public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, 
                                                                                       final PS prefixKeySerializer) {
           return wrapped().prefixScan(prefix, prefixKeySerializer);
       }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -97,6 +98,11 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       Unit test for this method is missing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -229,6 +230,15 @@ public V delete(final K key) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       This method needs unit testing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -103,6 +105,19 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+        Objects.requireNonNull(prefix, "prefix cannot be null");
+        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
+
+        final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
+        final Bytes to = Bytes.increment(from);
+
+        return new DelegatingPeekingKeyValueIterator<>(
+                name,
+                new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true));

Review comment:
       ```suggestion
           return new DelegatingPeekingKeyValueIterator<>(
               name,
               new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true)
           );
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {
+    private byte[] rawPrefix;
+
+    RocksDBPrefixIterator(final String name,
+                          final RocksIterator newIterator,
+                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
+                          final Bytes prefix) {
+        super(name, newIterator, openIterators, true);
+        this.rawPrefix = prefix.get();
+        newIterator.seek(rawPrefix);
+    }
+
+    private boolean prefixEquals(final byte[] x, final byte[] y) {

Review comment:
       Please use meaningful names for the paramters instead of `x` and `y`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -308,6 +309,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       Please put the second parameter on a new line and align the two parameters like:
   ```suggestion
       public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, 
                                                                                       final PS prefixKeySerializer) {
   ```
   (On GitHub the suggestions seems a bit strange)

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,118 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k1")),
+                stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_3")),
+                stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k2")),
+                stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_2")),
+                stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k3")),
+                stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_1")),
+                stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer);
+        final String[] valuesWithPrefix = new String[3];
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertEquals(3, numberOfKeysReturned);
+        // The order might seem inverted to the order in which keys were inserted, but since Rocksdb stores keys
+        // lexicographically, prefix_1 would still be the first key that is returned.
+        assertEquals(valuesWithPrefix[0], "f");
+        assertEquals(valuesWithPrefix[1], "d");
+        assertEquals(valuesWithPrefix[2], "b");
+
+        // Lastly, simple key value lookups should still work :)
+        assertEquals(
+                "c",
+                stringDeserializer.deserialize(
+                        null,
+                        rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "k2")))));
+

Review comment:
       Please remove empty line.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,118 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k1")),
+                stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_3")),
+                stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k2")),
+                stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_2")),
+                stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k3")),
+                stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_1")),
+                stringSerializer.serialize(null, "f")));

Review comment:
       Please fix the indentation and the newlines also in the other tests as follows:
   ```suggestion
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "k1")),
               stringSerializer.serialize(null, "a"))
           );
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "prefix_3")),
               stringSerializer.serialize(null, "b"))
           );
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "k2")),
               stringSerializer.serialize(null, "c"))
           );
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "prefix_2")),
               stringSerializer.serialize(null, "d"))
           );
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "k3")),
               stringSerializer.serialize(null, "e"))
           );
           entries.add(new KeyValue<>(
               new Bytes(stringSerializer.serialize(null, "prefix_1")),
               stringSerializer.serialize(null, "f"))
           );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,118 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k1")),
+                stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_3")),
+                stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k2")),
+                stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_2")),
+                stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k3")),
+                stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_1")),
+                stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer);
+        final String[] valuesWithPrefix = new String[3];
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertEquals(3, numberOfKeysReturned);

Review comment:
       We prefer to use `assertThat()` in new test, because it is better readable. This line would then become:
    ```suggestion
           assertThat(numberOfKeysReturned, is(3));
   ```
   Please replace also the other verifications with `assertThat()` and fix the indentation (should be 4 spaces, not 8). 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -103,6 +105,19 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
       This method needs unit testing.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,118 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k1")),
+                stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_3")),
+                stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k2")),
+                stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_2")),
+                stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k3")),
+                stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_1")),
+                stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer);
+        final String[] valuesWithPrefix = new String[3];
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
+        }
+        // Since there are 3 keys prefixed with prefix, the count should be 3
+        assertEquals(3, numberOfKeysReturned);
+        // The order might seem inverted to the order in which keys were inserted, but since Rocksdb stores keys
+        // lexicographically, prefix_1 would still be the first key that is returned.
+        assertEquals(valuesWithPrefix[0], "f");
+        assertEquals(valuesWithPrefix[1], "d");
+        assertEquals(valuesWithPrefix[2], "b");
+
+        // Lastly, simple key value lookups should still work :)
+        assertEquals(
+                "c",
+                stringDeserializer.deserialize(
+                        null,
+                        rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "k2")))));
+
+    }
+
+    @Test
+    public void shouldReturnUUIDsWithStringPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
+        final String prefix = uuid1.toString().substring(0, 4);
+        entries.add(new KeyValue<>(
+                new Bytes(uuidSerializer.serialize(null, uuid1)),
+                stringSerializer.serialize(null, "a")));
+
+        entries.add(new KeyValue<>(
+                new Bytes(uuidSerializer.serialize(null, uuid2)),
+                stringSerializer.serialize(null, "b")));
+
+

Review comment:
       Please remove empty line.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -107,4 +108,18 @@
      * @throws InvalidStateStoreException if the store is not initialized
      */
     long approximateNumEntries();
+
+    /**
+     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
+     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
+     * format in which the keys are stored underneath in the stores
+     * @param prefix The prefix.
+     * @param prefixKeySerializer Serializer for the Prefix key type
+     * @param <PS> Prefix Serializer type
+     * @param <P> Prefix Type.
+     * @return The iterator for keys having the specified prefix.
+     */
+    default <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {

Review comment:
       I would move this method above `approximateNumEntries`, so that all methods that return iterators are in one block.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       This class needs unit testing.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -360,6 +361,118 @@ public void shouldPutAll() {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
     }
 
+    @Test
+    public void shouldReturnKeysWithGivenPrefix() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k1")),
+                stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_3")),
+                stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k2")),
+                stringSerializer.serialize(null, "c")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_2")),
+                stringSerializer.serialize(null, "d")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "k3")),
+                stringSerializer.serialize(null, "e")));
+        entries.add(new KeyValue<>(
+                new Bytes(stringSerializer.serialize(null, "prefix_1")),
+                stringSerializer.serialize(null, "f")));
+
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
+
+        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer);
+        final String[] valuesWithPrefix = new String[3];
+        int numberOfKeysReturned = 0;
+
+        while (keysWithPrefix.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
+            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);

Review comment:
       I would rather use a `List` instead of the array and then verify if the length of the list is three. That will give us a more meaningful assertion error than the `IndexOutOfBoundsException` that we would get with the array in case of failure.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -256,6 +256,11 @@ public void close() {
             oldColumnFamily.close();
             newColumnFamily.close();
         }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+            throw new UnsupportedOperationException();
+        }

Review comment:
       Why is the prefix scan not supported here? The timstamped stores are the default stores in Kafka Streams.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -273,6 +275,43 @@ public void testDrivingStatefulTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testPrefixScanStatefulTopology() {

Review comment:
       You do not need to test the prefix scan by using the topology test driver because you actually do not need a topology. Your addition is limited to the state stores and those you should unit test. Furthermore, this test is misplaced because you did not change anything in `ProcessorTopology`. Please remove this test and add unit tests where I indicated the need for them.
   
   Although, you will find a lot of test named `test*`, we name new tests with `should*` followed by the expected result, e.g., `shouldThrowIllegalStateException` or `shouldGetRecordsWithPrefixedKey`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org