You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/25 00:13:09 UTC

[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475910689



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -119,15 +118,16 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param timeFrom  time range start (inclusive)
-     * @param timeTo    time range end (inclusive)
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive)
+     * @param timeTo   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if one of the given keys is {@code null}
+     * @throws NullPointerException       if one of the given keys is {@code null}
      */
-    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+    // note, this method must be kept if super#fetch(...) is removed
+    @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);

Review comment:
       These methods were introduced when adding Duration/Instant support https://github.com/apache/kafka/pull/5682.
   
   I don't think these are needed, we can do a similar change as for SessionStore read operations. wdyt?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -419,13 +504,13 @@ Long minTime() {
         }

Review comment:
       For windowStore, only time-based index is been iterated backward. The KIP didn't considered reversing key/value stores internally. 
   
   We would need another flag (apart from backward) to define order of internal keys, which its cumbersome, and the order between keys doesn't matter much or can be calculated by the user. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());

Review comment:
       Will have to double check this. I have inverted the current/last segment for backwards use-case though.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -72,22 +86,40 @@
             searchSpace.iterator(),

Review comment:
       `searchSpace` will be reversed based on the `forward` flag, on `AbstractSegments`. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes
     @Deprecated
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
+        return fetch(key, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final Instant from, final Instant to) {
+        final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
       Only backward compatibility. If it make sense to remove these deprecations as part of this KIP, I'd be happy to help cleaning it. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -337,25 +462,32 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
             this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
             this.currentSegmentId = cacheFunction.segmentId(timeFrom);

Review comment:
       great catch! I think this hasn't pop up yet in the tests as all tests may be using the same segment.
   
   Will double check to add more tests to validate this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org