You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/17 12:38:27 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r544411701



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##########
@@ -434,6 +435,23 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
         verify(inner);
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        final StringSerializer stringSerializer = new StringSerializer();
+        expect(inner.prefixScan(KEY, stringSerializer))
+                .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer);
+        assertThat(iterator.next().value, equalTo(VALUE));
+        assertFalse(iterator.hasNext());

Review comment:
       This is not strictly necessary since you test the mock result you provide which has nothing to do with the code under test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -627,6 +645,11 @@ public void addToBatch(final byte[] key,
         public void close() {
             columnFamily.close();
         }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+            return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix);
+        }

Review comment:
       Sorry, if I haven't noticed it before. Could you move this method up before `approximateNumEntries()` to have all operations in one block?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##########
@@ -110,6 +110,14 @@ private StateStoreMetrics() {}
     private static final String RANGE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
     private static final String RANGE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
 
+    private static final String PREFIX_SCAN = "prefixScan";
+    private static final String PREFIX_SCAN_DESCRIPTION = "calls to prefixScan";

Review comment:
       Please change `prefixScan` to `prefix-scan`. For metrics name we do not use camel case. See `put-all` or `put-if-absent` for examples.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -229,6 +232,15 @@ public V delete(final K key) {
         }
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+        return new MeteredKeyValueIterator(
+            wrapped().prefixScan(prefix, prefixKeySerializer),
+            prefixScanSensor
+        );

Review comment:
       nit: 
   ```suggestion
           return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##########
@@ -434,6 +435,23 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
         verify(inner);
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {
+        final StringSerializer stringSerializer = new StringSerializer();
+        expect(inner.prefixScan(KEY, stringSerializer))
+                .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));

Review comment:
       nit: Please use 4 instead of 8 spaces indentation.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##########
@@ -284,6 +284,10 @@ public boolean isEmpty() {
         return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
     }
 
+    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, final boolean toInclusive) {
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, toInclusive), true);
+    }

Review comment:
       nit: I would just add parameter `toInclusive` to the existing method instead of creating an overload.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##########
@@ -307,6 +315,26 @@ public static Sensor rangeSensor(final String threadId,
         );
     }
 
+    public static Sensor prefixScanSensor(final String threadId,
+                                     final String taskId,
+                                     final String storeType,
+                                     final String storeName,
+                                     final StreamsMetricsImpl streamsMetrics) {
+        return throughputAndLatencySensor(

Review comment:
       For new sensors like this, we only need to consider built-in metrics version `LATEST`. Hence, you should not call `throughputAndLatencySensor()`, but only call the parts that are relevant for `LATEST` and not for `FROM_0100_TO_24`. You also need to adapt the corresponding unit test for that. See also KIP-444 for more details.     

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -201,6 +201,14 @@ public MemoryLRUCacheBytesIterator reverseAll(final String namespace) {
         return new MemoryLRUCacheBytesIterator(cache.reverseAllKeys(), cache);
     }
 
+    public MemoryLRUCacheBytesIterator prefixScan(final String namespace, final Bytes from, final Bytes to) {
+        final NamedCache cache = getCache(namespace);
+        if (cache == null) {
+            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics));
+        }
+        return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to, false), cache);

Review comment:
       If you consider my comment in `NamedCache`, you could also just call `range()` here. Or -- if you want to be more descriptive -- having `prefixScan()` and `range()` in this class calling a private overload of `range()` with a flag that excludes or includes the end of the range. That would deduplicate code. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       I actually do not completely understand why we need a specific iterator for the prefix scan. We could just as good extend `RocksDBRangeIterator` to consider or not consider the end result of the range. We can do that because those iterator implementations are internal and the public API does not care which iterator is used as long as it implements interface `KeyValueIterator` and the behavior is correct. Extending and re-using `RocksDBRangeIterator` would lead to less code to maintain. Note, I agree that we need the public method `prefixScan()`, but what the implementation uses internally is not relevant for the KIP as long as it is correct. Did I miss something that imposes the implementation for a separate iterator? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
##########
@@ -359,6 +361,34 @@ public void shouldReverseIterateOverRange() {
         ), results);
     }
 
+    @Test
+    public void shouldGetRecordsWithPrefixKey() {

Review comment:
       In all the tests for the prefix scan you should also verify boundary conditions, e.g., if you have a prefix `abcd`, you should verify that `abce` is not matched since this is the first key that should not be matched. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org