You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/22 00:31:14 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475014135



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -72,22 +86,40 @@
             searchSpace.iterator(),

Review comment:
       This should be a descending iterator for the reverse case (here and the other reverse methods in this class)

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
##########
@@ -107,26 +109,76 @@ public void shouldIterateOverAllSegments() {
         assertFalse(iterator.hasNext());
     }
 
+    @Test
+    public void shouldIterateBackwardOverAllSegments() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("a".getBytes()),
+            Bytes.wrap("z".getBytes()),
+            false);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));

Review comment:
       This doesn't look quite right...shouldn't it be D, C, B, A? I guess in the test we just need to use a descending iterator 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());

Review comment:
       Github won't let me comment up there but on line 418, shouldn't we should have to decrement the currentSegmentId for the reverse case? I'm a little confused because it looks like you have test coverage for the multi-segment case and it seems to pass. Maybe I'm just tired and missing something obvious here..
   For example in `CachingWindowStoreTest#shouldFetchAndIterateOverKeyBackwardRange` the results seem to go across multiple segments, but it looks like we actually do return the record from the largest segment first? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes
     @Deprecated
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
+        return fetch(key, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final Instant from, final Instant to) {
+        final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
       I feel like it's a little awkward to have the reverse variations accept time parameters as an `Instant` while the forward versions just use a `long`. I would have thought we could migrate the long methods to Instant at some point but I see all these ` note, this method must be kept if super#fetch(...) is removed` comments littered throughout the code...so maybe there's a reason for sticking with the `long` overrides in the innermost store layer?
   Did you come across anything that suggested a reason for keeping the long flavors? cc @guozhangwang or @mjsax -- why can't we remove these?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -119,15 +118,16 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param timeFrom  time range start (inclusive)
-     * @param timeTo    time range end (inclusive)
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive)
+     * @param timeTo   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if one of the given keys is {@code null}
+     * @throws NullPointerException       if one of the given keys is {@code null}
      */
-    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+    // note, this method must be kept if super#fetch(...) is removed
+    @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);

Review comment:
       Do you know why we have all these ReadOnlyWindowStore methods also declared here in WindowStore? We don't need reverse variations of these I guess? 🤔  

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
##########
@@ -212,17 +316,62 @@ public boolean hasNext() {
         };
     }
 
-    @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from,
-                                                            final K to,
-                                                            final Instant fromTime,
-                                                            final Instant toTime) throws IllegalArgumentException {
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final Instant fromTime,
+                                                  final Instant toTime) throws IllegalArgumentException {
         return fetch(
             from,
-            to, 
+            to,
             ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
             ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
+                                                          final K to,
+                                                          final Instant fromTimeInstant,
+                                                          final Instant toTimeInstant) throws IllegalArgumentException {
+        final long timeFrom = ApiUtils.validateMillisecondInstant(fromTimeInstant, prepareMillisCheckFailMsgPrefix(fromTimeInstant, "fromTimeInstant"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(toTimeInstant, prepareMillisCheckFailMsgPrefix(toTimeInstant, "toTimeInstant"));
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (long now = timeFrom; now <= timeTo; now++) {

Review comment:
       Need to flip the loop

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+
+            if (forward) {
+                current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            } else {
+                current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo);
+            }
         }
 
         private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) {

Review comment:
       Just noticed that we use `==` instead of `.equals` down on line 437, can you fix that on the side?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
##########
@@ -135,24 +188,74 @@ public void shouldThrowInvalidStateStoreExceptionOnRebalance() {
         store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
     }
 
+    @Test(expected = InvalidStateStoreException.class)

Review comment:
       nit: use `assertThrows` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -419,13 +504,13 @@ Long minTime() {
         }

Review comment:
       As always Github won't let me comment on the line I actually want to (😞 ) but I think we need a descending iterator for the reverse case in `setRecordIterator` (lines 411 & 413)

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
##########
@@ -123,7 +173,8 @@ public void shouldIterateBothStoreAndCache() {
 
     private MergedSortedCacheWindowStoreKeyValueIterator createIterator(
         final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
-        final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
+        final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs,
+        final boolean forward

Review comment:
       nit: the existing style here is  inconsistent with the rest of Streams, should have 1st parameter on same line as method declaration (and everything else aligned to that) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -337,25 +462,32 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
             this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
             this.currentSegmentId = cacheFunction.segmentId(timeFrom);

Review comment:
       We should start on the largest segment I think (largest segment == farthest advanced in time)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org