You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/07 16:21:08 UTC

[GitHub] [kafka] jeqo opened a new pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

jeqo opened a new pull request #9137:
URL: https://github.com/apache/kafka/pull/9137


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r470311062



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -29,7 +29,7 @@
                           final RocksIterator newIterator,
                           final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                           final Bytes prefix) {
-        super(name, newIterator, openIterators);
+        super(name, newIterator, openIterators, false);

Review comment:
       This class is still unused so maybe we should just take it out. Someone's working on a prefix seek KIP at the moment so it'll be replaced soon anyway

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       We need to branch on `reverse` here too, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -306,26 +319,41 @@ public synchronized boolean hasNext() {
                 } else {
                     next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value());
                     nextWithTimestamp = null;
-                    iterWithTimestamp.next();
+                    if (reverse) {
+                        iterWithTimestamp.prev();
+                    } else {
+                        iterWithTimestamp.next();
+                    }
                 }
             } else {
                 if (nextWithTimestamp == null) {
                     next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
                     nextNoTimestamp = null;
-                    iterNoTimestamp.next();
+                    if (reverse) {
+                        iterNoTimestamp.prev();
+                    } else {
+                        iterNoTimestamp.next();
+                    }
                 } else {
                     if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) {

Review comment:
       If `compare(noTimestamp, withTimestamp)` <= 0 then `withTimestamp >= noTimestamp`, so for the reverse case we would actually want to return `withTimestamp` next here (and vice versa in the else block)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +61,11 @@ public synchronized boolean hasNext() {
             return allDone();
         } else {
             next = getKeyValue();
-            iter.next();
+            if (reverse) {

Review comment:
       Instead of storing and checking the `reverse` flag on every iteration, can we define something like
   ```
   java.util.function.Consumer<RocksIterator> advanceIterator = reverse ? RocksIterator::prev : RocksIterator::next;
   ```
   so then we can just blindly call `advanceIterator.accept(iter)`  

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -188,7 +188,55 @@ public void testPutGetRange() {
     }
 
     @Test
-    public void testPutGetRangeWithDefaultSerdes() {
+    public void testPutGetReverseRange() {

Review comment:
       I think we should add some tests to `RocksDBTimestampedStoreTest`. I thought it would extend `AbstractKeyValueStoreTest` and thus benefit from everything you added here, but doesn't seem to be the case :/ 
   Personally I found the `RocksDBDualCFIterator` logic a bit difficult to follow even before the reverse iteration,  so it would be nice to have some tests specifically covering reverse iterators over multi-column-family timestamped stores

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -321,67 +368,94 @@ public void shouldClearNamespaceCacheOnClose() {
         assertEquals(0, cache.size());
     }
 
-    @Test(expected = InvalidStateStoreException.class)

Review comment:
       Awesome, thank you for cleaning up the whole class 🙏 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       Kind of unrelated, but WDYT about renaming `RocksDBDualCFIterator` to `RocksDBDualCFAllIterator` or something on the side? I feel like these iterators could be cleaned up a bit in general to be more understandable -- for example, it's weird that we do the `iterator#seek`-ing in the actual `all()` method but for range queries we do the seeking inside the iterator constructor. 
   Just thinking out loud though, we can do some followup refactoring once this is merged




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r472531237



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.rocksdb.RocksIterator;
-
-import java.util.Set;
-
-class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       Was this class unused or something?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -249,7 +248,23 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         validateStoreOpen();
         final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from, to);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to);
-        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative numbers");

Review comment:
       This warning seems to miss the most likely scenario, that the user just passed the arguments in the wrong order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9137:
URL: https://github.com/apache/kafka/pull/9137


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r470536034



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -188,7 +188,55 @@ public void testPutGetRange() {
     }
 
     @Test
-    public void testPutGetRangeWithDefaultSerdes() {
+    public void testPutGetReverseRange() {

Review comment:
       Just realized that, I also thought that path was tested. Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r471459574



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -188,7 +188,55 @@ public void testPutGetRange() {
     }
 
     @Test
-    public void testPutGetRangeWithDefaultSerdes() {
+    public void testPutGetReverseRange() {

Review comment:
       @ableegoldman I have extended `RocksDBTimestampedStoreTest` to use `reverseAll` and `reverseRange` as part of the current tests. 
   Unfortunately, `AbstractKeyValueStoreTest` tests do not fit with the creation path of Timestamped stores as pre inserted data is required. 
   Will add this to the same JIRA ticket to consider when refactoring iterators and tests.
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r469314938



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -422,6 +503,21 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
                     " Note that the built-in numerical serdes do not follow this for negative numbers")
             );
         }
+    }
+
+    @Test
+    public void shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() {

Review comment:
       ack. added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#issuecomment-673574377


   @ableegoldman this should be ready for another review :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r473068515



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.rocksdb.RocksIterator;
-
-import java.util.Set;
-
-class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       yes, it looks like it was. Thanks for cleaning it up!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#issuecomment-677489429


   @vvcephei thanks for your feedback! Inverting flag actually make things more readable. Changing to `forward` and `if (forward) {} else {}`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#issuecomment-678575081


   It looks like java 14 only failed while attempting to download some docs from Oracle, and java 8 failed on this unrelated test: `org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r469225368



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);

Review comment:
       right. I've moved this into each condition to set a correct exception message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r468843465



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +61,8 @@ public synchronized boolean hasNext() {
             return allDone();
         } else {
             next = getKeyValue();
-            iter.next();
+            if (reverse) iter.prev();

Review comment:
       separate lines 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);

Review comment:
       nit: `RawToKey` --> `RawLastKey`

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -294,13 +305,27 @@ public void shouldIterateOverRange() {
         assertEquals(items, results.size());
     }
 
+    @Test
+    public void shouldReverseIterateOverRange() {
+        final int items = addItemsToCache();
+        final KeyValueIterator<Bytes, byte[]> range =
+            store.reverseRange(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items)));
+        final List<Bytes> results = new ArrayList<>();
+        while (range.hasNext()) {
+            results.add(range.next().key);
+        }
+        assertEquals(items, results.size());

Review comment:
       Can we add some tests that verify the actual contents + order of the reverse range? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -150,8 +166,9 @@ public void close() {
     private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
         private final Iterator<Bytes> iter;
 
-        private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
-            this.iter = new TreeSet<>(keySet).iterator();
+        private InMemoryKeyValueIterator(final Set<Bytes> keySet, final boolean reverse) {
+            if (reverse) this.iter = new TreeSet<>(keySet).descendingIterator();
+            else this.iter = new TreeSet<>(keySet).iterator();

Review comment:
       nit: Use braces & separate lines

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -110,7 +111,15 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
 
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        return range(from, to, false);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) {
+        return range(from, to, true);
+    }
 
+    KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to, final boolean reverse) {

Review comment:
       Should be private

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -38,35 +38,68 @@
      *
      * @param key The key to fetch
      * @return The value or null if no value is found.
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException       If null is used for key.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     V get(K key);
 
     /**
      * Get an iterator over a given range of keys. This iterator must be closed after use.
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
-     * and must not return null values. No ordering guarantees are provided.
-     * @param from The first key that could be in the range
-     * @param to The last key that could be in the range
-     * @return The iterator for this range.
-     * @throws NullPointerException If null is used for from or to.
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration starts from.
+     * @param to   The last key that could be in the range, where iteration ends.
+     * @return The iterator for this range, from smallest to largest bytes.
+     * @throws NullPointerException       If null is used for from or to.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     KeyValueIterator<K, V> range(K from, K to);
 
+    /**
+     * Get a reverse iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration ends.
+     * @param to   The last key that could be in the range, where iteration starts from.

Review comment:
       Seems a bit tricky to say that _to_ is the variable where iteration starts _from_ 😉  But I can see it both ways, so being clear in the javadocs is good enough for me

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##########
@@ -281,15 +281,24 @@ public boolean isEmpty() {
     }
 
     synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
-        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), false);
     }
 
-    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
-        return new TreeSet<>(keySet).iterator();
+    synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
+    }
+
+    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final boolean reverse) {
+        if (reverse) return new TreeSet<>(keySet).descendingIterator();

Review comment:
       nit: braces + separate lines

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
         }
     }
 
     @Override
     public KeyValue<Bytes, byte[]> makeNext() {
         final KeyValue<Bytes, byte[]> next = super.makeNext();
-
         if (next == null) {
             return allDone();
         } else {
-            if (comparator.compare(next.key.get(), rawToKey) <= 0) {
-                return next;
+            if (!reverse) {
+                if (comparator.compare(next.key.get(), rawLastKey) <= 0) return next;
+                else return allDone();

Review comment:
       nit: braces & separate lines

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -422,6 +503,21 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
                     " Note that the built-in numerical serdes do not follow this for negative numbers")
             );
         }
+    }
+
+    @Test
+    public void shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() {

Review comment:
       Can we add tests for some other invalid range cases? For example with both bounds positive but from > to

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -306,26 +313,29 @@ public synchronized boolean hasNext() {
                 } else {
                     next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value());
                     nextWithTimestamp = null;
-                    iterWithTimestamp.next();
+                    if (reverse) iterWithTimestamp.prev();

Review comment:
       braces & separate lines here and below

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -339,12 +366,24 @@ public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
         store.range(bytesKey("a"), bytesKey("b"));
     }
 
+    @Test(expected = InvalidStateStoreException.class)

Review comment:
       Use `assertThrows` -- we've been (slowly) migrating away from `@Test(expected)` in the Streams tests

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -193,22 +193,26 @@ public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
 
         @Override
         public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
-                                                     final Bytes to) {
+                                                     final Bytes to,
+                                                     final boolean reverse) {
             return new RocksDBDualCFRangeIterator(
                 name,
                 db.newIterator(newColumnFamily),
                 db.newIterator(oldColumnFamily),
                 from,
-                to);
+                to,
+                reverse);
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> all() {
+        public KeyValueIterator<Bytes, byte[]> all(final boolean reverse) {
             final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
-            innerIterWithTimestamp.seekToFirst();
+            if (reverse) innerIterWithTimestamp.seekToLast();

Review comment:
       braces & separate lines

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -150,6 +161,17 @@ public void shouldThrowUnsupportedOperationExceptionWhileRemove() {
         } catch (final UnsupportedOperationException e) { }
     }
 
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() {
+        stubOneUnderlying.put("a", "1");
+        stubOneUnderlying.put("b", "1");
+        final KeyValueIterator<String, String> keyValueIterator = theStore.reverseRange("a", "b");
+        try {
+            keyValueIterator.remove();

Review comment:
       use `assertThrows` here as well

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -199,11 +236,44 @@ public void shouldSupportRangeAcrossMultipleKVStores() {
         cache.put("x", "x");
 
         final List<KeyValue<String, String>> results = toList(theStore.range("a", "e"));
+        assertArrayEquals(
+            asList(
+                new KeyValue<>("a", "a"),
+                new KeyValue<>("b", "b"),
+                new KeyValue<>("c", "c"),
+                new KeyValue<>("d", "d")
+            ).toArray(),
+            results.toArray());
+    }
+
+    @Test
+    public void shouldSupportReverseRangeAcrossMultipleKVStores() {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = toList(theStore.reverseRange("a", "e"));
         assertTrue(results.contains(new KeyValue<>("a", "a")));
         assertTrue(results.contains(new KeyValue<>("b", "b")));
         assertTrue(results.contains(new KeyValue<>("c", "c")));
         assertTrue(results.contains(new KeyValue<>("d", "d")));
         assertEquals(4, results.size());
+        //FIXME: order does not hold between stores, how to validate order here?

Review comment:
       I think the best we can do is just make sure that order is correct within a store. ie if `a`, `m` are all in `stubOneUnderling` then make sure the reverse range returns `m` before `a`.
   
   I also think it would be fine to just make sure all the expected values are returned without checking the order, since there are other tests to verify that the order within a store is correct

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);

Review comment:
       Also it should be `from` for the reverse case, right? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r469315300



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -294,13 +305,27 @@ public void shouldIterateOverRange() {
         assertEquals(items, results.size());
     }
 
+    @Test
+    public void shouldReverseIterateOverRange() {
+        final int items = addItemsToCache();
+        final KeyValueIterator<Bytes, byte[]> range =
+            store.reverseRange(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items)));
+        final List<Bytes> results = new ArrayList<>();
+        while (range.hasNext()) {
+            results.add(range.next().key);
+        }
+        assertEquals(items, results.size());

Review comment:
       
   
   ack. added.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r473071200



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +62,7 @@ public synchronized boolean hasNext() {
             return allDone();
         } else {
             next = getKeyValue();
-            iter.next();
+            advanceIterator.accept(iter);

Review comment:
       clever!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r469906366



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -199,11 +236,44 @@ public void shouldSupportRangeAcrossMultipleKVStores() {
         cache.put("x", "x");
 
         final List<KeyValue<String, String>> results = toList(theStore.range("a", "e"));
+        assertArrayEquals(
+            asList(
+                new KeyValue<>("a", "a"),
+                new KeyValue<>("b", "b"),
+                new KeyValue<>("c", "c"),
+                new KeyValue<>("d", "d")
+            ).toArray(),
+            results.toArray());
+    }
+
+    @Test
+    public void shouldSupportReverseRangeAcrossMultipleKVStores() {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = toList(theStore.reverseRange("a", "e"));
         assertTrue(results.contains(new KeyValue<>("a", "a")));
         assertTrue(results.contains(new KeyValue<>("b", "b")));
         assertTrue(results.contains(new KeyValue<>("c", "c")));
         assertTrue(results.contains(new KeyValue<>("d", "d")));
         assertEquals(4, results.size());
+        //FIXME: order does not hold between stores, how to validate order here?

Review comment:
       ack. make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r474213204



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.rocksdb.RocksIterator;
-
-import java.util.Set;
-
-class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
       Yeah this was left over from the FKJ work IIRC




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r471451498



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-10409




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r469315828



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -339,12 +366,24 @@ public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
         store.range(bytesKey("a"), bytesKey("b"));
     }
 
+    @Test(expected = InvalidStateStoreException.class)

Review comment:
       migrated the whole class. will apply this to the other PRs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#issuecomment-670850075


   cc @ableegoldman 
   
   This PR is ready for review, covering related feedback from #8976


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r471836789



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -188,7 +188,55 @@ public void testPutGetRange() {
     }
 
     @Test
-    public void testPutGetRangeWithDefaultSerdes() {
+    public void testPutGetReverseRange() {

Review comment:
       Ah, that makes sense I guess. Looks like the new additions to `RocksDBTimestampedStoreTest` cover the cross-column family code path so that's good enough for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r471449807



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       Agree. I will continue the current approoach and create an issue to follow up this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org