You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/13 22:03:55 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11598: feat: Implement range and scan queries

vvcephei commented on a change in pull request #11598:
URL: https://github.com/apache/kafka/pull/11598#discussion_r768122088



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -175,6 +194,7 @@ public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         if (from == null && to == null) {
             return getKeyValueIterator(map.keySet(), forward);
         } else if (from == null) {
+            System.out.println("-----------> range upper bound");

Review comment:
       looks like this was left over.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,85 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    collectExecutionInfo,
+                    this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(
+            final Query query, final PositionBound positionBound, final boolean collectExecutionInfo) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        final RangeQuery rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get()));
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }

Review comment:
       At the risk of being too fancy, what do you think about this instead?
   ```suggestion
           rawRangeQuery = RangeQuery.withRange(typedQuery.getLowerBound.map(this::keyBytes),
               typedQuery.getUpperBound.map(this::keyBytes));
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -279,34 +344,34 @@ public static void before()
                 final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
                 assertThat(recordMetadata.hasOffset(), is(true));
                 INPUT_POSITION.withComponent(
-                    recordMetadata.topic(),
-                    recordMetadata.partition(),
-                    recordMetadata.offset()
+                        recordMetadata.topic(),
+                        recordMetadata.partition(),
+                        recordMetadata.offset()
                 );
             }
         }
 
         assertThat(INPUT_POSITION, equalTo(
-            Position
-                .emptyPosition()
-                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
-                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+                Position
+                        .emptyPosition()
+                        .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                        .withComponent(INPUT_TOPIC_NAME, 1, 1L)
         ));
     }
 
     @Before
     public void beforeTest() {
         final StoreSupplier<?> supplier = storeToTest.supplier();
         final Properties streamsConfig = streamsConfiguration(
-            cache,
-            log,
-            storeToTest.name()
+                cache,
+                log,
+                storeToTest.name()

Review comment:
       I'm sorry to sound picky, but do you mind backing out these formatting changes? I'm only concerned because there's a lot of them. Otherwise, we'll just have duelling autoformat results between commits.
   
   For reference, I have indents and continuation indents both set to `4` for this project.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -426,6 +481,68 @@ public void verifyStore() {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.of(3),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.empty(),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.of(1),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                            mkSet(0, 1)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.empty(),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                            mkSet(0, 1, 2, 3)
+
+                    );
+
+                } else {
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.of(3),
+                            Function.identity(),
+                            mkSet(1, 2, 3)
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.empty(),
+                            Function.identity(),
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.of(1),
+                            Function.identity(),
+                            mkSet(0, 1)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.empty(),
+                            Function.identity(),
+                            mkSet(0, 1, 2, 3)
+
+                    );
+                }

Review comment:
       Sorry, not trying to golf here, but since everything between these branches is the same except the extractor function, I'm wondering if we should factor the checks out into a separate method:
   ```suggestion
                   if (storeToTest.timestamped()) {
                       shouldHandleRangeQueries(
                           (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value
                       );
                   } else {
                       shouldHandleRangeQueries(Function.identity());
                   }
                   
   ...
       private <T> void shouldHandleRangeQueries(final Function<T, Integer> extractor) {
                       shouldHandleRangeQuery(
                               Optional.of(1),
                               Optional.of(3),
                               extractor,
                               mkSet(1, 2, 3)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.of(1),
                               Optional.empty(),
                               extractor,
                               mkSet(1, 2, 3)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.empty(),
                               Optional.of(1),
                               extractor,
                               mkSet(0, 1)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.empty(),
                               Optional.empty(),
                               extractor,
                               mkSet(0, 1, 2, 3)
   
                       );
       }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -92,11 +93,29 @@ Position getPosition() {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <R> QueryResult<R> query(
         final Query<R> query,
         final PositionBound positionBound,
         final boolean collectExecutionInfo) {
 
+        if (query instanceof RangeQuery) {
+            final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, byte[]>) query;
+            final KeyValueIterator<Bytes, byte[]> keyValueIterator;
+            if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) {
+                keyValueIterator = this.range(typedQuery.getLowerBound().get(), typedQuery.getUpperBound().get());
+            } else if (typedQuery.getLowerBound().isPresent()) {
+                keyValueIterator = this.range(typedQuery.getLowerBound().get(), null);
+            } else if (typedQuery.getUpperBound().isPresent()) {
+                keyValueIterator = this.range(null, typedQuery.getUpperBound().get());
+            } else {
+                keyValueIterator = this.range(null, null);
+            }

Review comment:
       It looks like this could be simplified. WDYT?
   
   ```suggestion
               keyValueIterator = this.range(typedQuery.getLowerBound().orElse(null), typedQuery.getUpperBound().orElse(null));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org