You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/04 22:27:56 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards

ableegoldman commented on a change in pull request #8976:
URL: https://github.com/apache/kafka/pull/8976#discussion_r462687337



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
##########
@@ -177,17 +197,36 @@ public V fetch(final K key,
             return wrapped().fetch(from, to, timeFrom, timeTo);
         }
 
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
+                                                      final K to,

Review comment:
       nit: parameters misaligned

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/BytesRangeValidator.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BytesRangeValidator {

Review comment:
       Do we really need a new class for this one static method? Just comparing with similar checks elsewhere, they seem to mostly be done inline. Which is nice since you can tell at a glance what is/isn't valid input to a method.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -310,12 +421,11 @@ public synchronized void close() {
         );
         if (!suppressed.isEmpty()) {
             throwSuppressed("Caught an exception while closing caching window store for store " + name(),
-                            suppressed);
+                suppressed);
         }
     }
 
 
-
     private class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {

Review comment:
       Just for some context, this iterator wrapper exists to fix a performance issue in the segmented stores. Due to the byte layout we were ending up iterating over records that weren't actually in the query range when going from one "segment" to another. Only the rocksdb window (and session) stores are segmented in this way, hence the check for `wrapped().persistent()` before applying this wrapper. Took me a  second to remember what that was all about
   Anyways, this wrapper makes sure we only iterate over valid records within a segment, so I'm thinking we might need to mess with the logic to check when we're going from one segment to another since we're going through the segments in reverse order

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -310,12 +421,11 @@ public synchronized void close() {
         );
         if (!suppressed.isEmpty()) {
             throwSuppressed("Caught an exception while closing caching window store for store " + name(),
-                            suppressed);
+                suppressed);
         }
     }
 
 
-
     private class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {

Review comment:
       Have you looked through this class? I think it might need some additional changes for the reverse case. For example in `#getNextSegmentIterator`  there's a check for `currentSegmentId > lastSegmentId`  which seems to indicate the end of the iteration. But we'd be iterating over the segments backwards as well, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##########
@@ -136,34 +176,65 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param fromTime  time range start (inclusive)
-     * @param toTime    time range end (inclusive)
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param fromTime time range start (inclusive)
+     * @param toTime   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If {@code null} is used for any key.
-     * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
+     * @throws NullPointerException       If {@code null} is used for any key.
+     * @throws IllegalArgumentException   if duration is negative or can't be represented as {@code long milliseconds}
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime)
         throws IllegalArgumentException;
 
     /**
-    * Gets all the key-value pairs in the existing windows.
-    *
-    * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
-    * @throws InvalidStateStoreException if the store is not initialized
-    */
+     * Get all the key-value pairs in the given key range and time range from all the existing windows
+     * in backward order with respect to time (from end to beginning of time).
+     * <p>
+     * This iterator must be closed after use.
+     *
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param fromTime time range start (inclusive)
+     * @param toTime   time range end (inclusive)
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException       If {@code null} is used for any key.
+     * @throws IllegalArgumentException   if duration is negative or can't be represented as {@code long milliseconds}

Review comment:
       nit: can we change `duration` --> `Instant` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
##########
@@ -136,14 +151,26 @@ public K peekNextKey() {
         }
 
         final int comparison = compare(nextCacheKey, nextStoreKey);
-        if (comparison > 0) {
-            return deserializeStoreKey(nextStoreKey);
-        } else if (comparison < 0) {
-            return deserializeCacheKey(nextCacheKey);
+        if (!reverse) {

Review comment:
       Can we pull out this "choose next" comparison logic here and above into a separate method?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -38,35 +38,68 @@
      *
      * @param key The key to fetch
      * @return The value or null if no value is found.
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException       If null is used for key.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     V get(K key);
 
     /**
      * Get an iterator over a given range of keys. This iterator must be closed after use.
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
-     * and must not return null values. No ordering guarantees are provided.
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
      * @param from The first key that could be in the range
-     * @param to The last key that could be in the range
+     * @param to   The last key that could be in the range
      * @return The iterator for this range.
-     * @throws NullPointerException If null is used for from or to.
+     * @throws NullPointerException       If null is used for from or to.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     KeyValueIterator<K, V> range(K from, K to);
 
+    /**
+     * Get a reverse iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The last key that could be in the range
+     * @param to   The first key that could be in the range

Review comment:
       We should be pretty explicit about the expected relation between these two input since we throw an exception if it's violated. I get that by "first" and "last"  you mean "smallest" and "largest", but I think someone could interpret this the other way around since technically the "first" key of a reverse range query is the largest




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org