You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/07 16:24:23 UTC

[GitHub] [kafka] jeqo opened a new pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

jeqo opened a new pull request #9139:
URL: https://github.com/apache/kafka/pull/9139


   Depends on https://github.com/apache/kafka/pull/9138
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705319349


   Oh, ironically, there is a merge conflict with trunk, and it's due to one of the only "extra" changes I left in: renaming Caching*StoreTest to Caching*StoreTest and in the Session variant, making `context` a local variable.
   
   I didn't un-rename that class, but I did restore `context` as a field. I copied over the newly-added test in trunk just to the InMemory variants of the tests. They only mock the inner store and test the init method anyway. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501346943



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            if (forward) {

Review comment:
       Ohh yeah good catch. Kind of weird that we're inconsistent with which underlying store type is used in the existing caching store tests, too. But thanks for adding the in-memory + persistent flavors for the SessionStore, I think it sounds reasonable to file a ticket to follow up on the Window/KV flavors and clean up the caching tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705556429


   Ok, it's merged. I'm waiting on permission from Bill before cherry-picking to 2.7


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705241380


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705240588


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501336253



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            if (forward) {

Review comment:
       One of the reasons these missing pieces are not throwing exceptions seems to be that Caching Store tests are only covering or InMemory or Persistent underlying stores. This means only one path of the caching logic is followed (e.g. `persistent() ? iteratorWraper : otherIterator`)
   
   I rename the test classes to be explicit about what backing store is covered. We could probably cover this as another issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705319493


   Jenkins is running again, so I'll let it finish and merge in the morning.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r499996879



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            if (forward) {
+                current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            } else {
+                current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo);
+            }
         }
 
         private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) {

Review comment:
       Can you fix the `keyFrom == keyTo` to use `.equals` on the side (down on line 370)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
+            if (forward) {

Review comment:
       I think we're going to need some additional changes in this class similar to what we had in CachingWindowStore. Definitely at least in `getNextSegmentIterator()`. Let's make sure to have some cross-segment test coverage here as well, especially because the iteration logic of session store range queries is the hardest to wrap your head around out of all the stores (at least, it is for me)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -201,7 +247,26 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
         removeExpiredSegments();
 
-        return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator());
+        return registerNewIterator(
+            key,
+            key,
+            Long.MAX_VALUE, endTimeMap.entrySet().iterator(),

Review comment:
       missing newline

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -382,9 +478,20 @@ private boolean setInnerIterators() {
                 currentKey = nextKeyEntry.getKey();
 
                 if (latestSessionStartTime == Long.MAX_VALUE) {
-                    recordIterator = nextKeyEntry.getValue().entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = nextKeyEntry.getValue().descendingMap().entrySet();
+                    else entries = nextKeyEntry.getValue().entrySet();
+                    recordIterator = entries.iterator();
                 } else {
-                    recordIterator = nextKeyEntry.getValue().headMap(latestSessionStartTime, true).entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = nextKeyEntry.getValue()

Review comment:
       If/else needs brackets




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500889276



##########
File path: streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
##########
@@ -82,11 +118,15 @@ public boolean hasNext() {
                 public KeyValue<Windowed<K>, V> next() {
                     return it.next();
                 }
-
             }
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to) {
+        return null;

Review comment:
       mixed choice here. hope is good enough 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9139:
URL: https://github.com/apache/kafka/pull/9139


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705319349






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705249059


   Alright Jenkins is being wonky yet again, but I checked out this branch and ran the tests locally to verify.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9139:
URL: https://github.com/apache/kafka/pull/9139


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501351457



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -382,9 +479,23 @@ private boolean setInnerIterators() {
                 currentKey = nextKeyEntry.getKey();
 
                 if (latestSessionStartTime == Long.MAX_VALUE) {
-                    recordIterator = nextKeyEntry.getValue().entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = nextKeyEntry.getValue().descendingMap().entrySet();
+                    else entries = nextKeyEntry.getValue().entrySet();

Review comment:
       The code style discourages inline conditionals. It's more maintainable to always use blocks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
##########
@@ -24,35 +24,156 @@
  * Implementations should be thread-safe as concurrent reads and writes
  * are expected.
  *
- * @param <K> the key type
+ * @param <K>   the key type
  * @param <AGG> the aggregated value type
  */
 public interface ReadOnlySessionStore<K, AGG> {
+
     /**
-     * Retrieve all aggregated sessions for the provided key.
+     * Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
+     * start is &le; latestSessionStartTime iterating from earliest to latest.
+     * <p>
      * This iterator must be closed after use.
      *
+     * @param key                    the key to return sessions for
+     * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
+     * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
+     * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time.
+     * @throws NullPointerException If null is used for key.
+     */
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                            final long earliestSessionEndTime,
+                                                            final long latestSessionStartTime) {
+        throw new UnsupportedOperationException("Moved from SessionStore");

Review comment:
       It won't matter to users whether this method was moved from another interface or not. They just need to know why they're getting the exception. I.e., we just need to tell them that the store implementation they selected didn't implement the method.
   
   ```suggestion
           throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
   ```
   
   We should say the exact same thing in all default implementations. Right now, they're a bit inconsistent.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -150,22 +149,44 @@ public void remove(final Windowed<Bytes> sessionKey) {
         validateStoreOpen();
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ?
-            new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) :
+            new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, true) :
             context.cache().range(cacheName,
-                        cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)),
-                        cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime))
+                cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)),
+                cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime))

Review comment:
       There are a lot of unnecessary whitespace changes in this PR. You don't need to back them all out right now, but in the future, please clean up the diff before submitting a PR. These extra changes make it harder to review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209


   cc @ableegoldman and team, this should be ready to review. 
   Would also be great to look #9228 as part of KIP-666, to get the APIs aligned. 
   
   Thanks!  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501334888



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -270,25 +335,32 @@ public void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long earliestSessionEndTime,
-                                     final long latestSessionStartTime) {
-            this(key, key, earliestSessionEndTime, latestSessionStartTime);
+                                     final long latestSessionStartTime,
+                                     final boolean forward) {
+            this(key, key, earliestSessionEndTime, latestSessionStartTime, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long earliestSessionEndTime,
-                                     final long latestSessionStartTime) {
+                                     final long latestSessionStartTime,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.latestSessionStartTime = latestSessionStartTime;
             this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
             this.segmentInterval = cacheFunction.getSegmentInterval();
+            this.forward = forward;
 
             this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime);

Review comment:
       right! great catch! forgot to align this with WindowStore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500664927



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -278,6 +326,23 @@ public void shouldFetchCorrectlyAcrossSegments() {
         assertFalse(results.hasNext());
     }
 
+    @Test
+    public void shouldBackwardFetchCorrectlyAcrossSegments() {
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, "1".getBytes());
+        cachingStore.put(a2, "2".getBytes());
+        cachingStore.flush();
+        cachingStore.put(a3, "3".getBytes());

Review comment:
       Can we add a few more records that span multiple segments that don't get flushed as well?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -301,6 +366,29 @@ public void shouldFetchRangeCorrectlyAcrossSegments() {
         assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
     }
 
+    @Test
+    public void shouldBackwardFetchRangeCorrectlyAcrossSegments() {
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, "1".getBytes());
+        cachingStore.put(aa1, "1".getBytes());
+        cachingStore.put(a2, "2".getBytes());
+        cachingStore.put(a3, "3".getBytes());
+        cachingStore.put(aa3, "3".getBytes());
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
+            cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
+        final Set<Windowed<Bytes>> keys = new HashSet<>();
+        while (rangeResults.hasNext()) {
+            keys.add(rangeResults.next().key);
+        }
+        rangeResults.close();
+        assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);

Review comment:
       We're losing the ordering check by comparing this as a set, let's use a list (or whatever) to verify the actual order

##########
File path: streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
##########
@@ -82,11 +118,15 @@ public boolean hasNext() {
                 public KeyValue<Windowed<K>, V> next() {
                     return it.next();
                 }
-
             }
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to) {
+        return null;

Review comment:
       I guess it probably doesn't matter since we presumably aren't using these backward methods of the ReadOnlySessionStoreStub, but it seems like it might result in some tricky NPEs to debug if ever someone does try to use it in a test. If you don't feel like implementing it I think it's fine to just throw UnsupportedOperationException and say that you'll have to implement this to use it.
   
   Or just copy the code from the forward direction and flip it 🤷‍♀️  Same goes for all the methods in here that return null

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##########
@@ -456,68 +562,88 @@ public void shouldClearNamespaceCacheOnClose() {
         assertEquals(0, cache.size());
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.fetch(keyA);
+        assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.findSessions(keyA, 0, Long.MAX_VALUE);
+        assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
         cachingStore.close();
-        cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)));
+        assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))));
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowIfTryingToPutIntoClosedCachingStore() {
         cachingStore.close();
-        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
+        assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
-        cachingStore.findSessions(null, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() {
-        cachingStore.findSessions(null, keyA, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() {
-        cachingStore.findSessions(keyA, null, 1L, 2L);
+        assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullFromKey() {
-        cachingStore.fetch(null, keyA);
+        assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullToKey() {
-        cachingStore.fetch(keyA, null);
+        assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnFetchNullKey() {
-        cachingStore.fetch(null);
+        assertThrows(NullPointerException.class, () -> cachingStore.fetch(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnRemoveNullKey() {
-        cachingStore.remove(null);
+        assertThrows(NullPointerException.class, () -> cachingStore.remove(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        cachingStore.put(null, "1".getBytes());
+        assertThrows(NullPointerException.class, () -> cachingStore.put(null, "1".getBytes()));
+    }
+
+    @Test
+    public void shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {

Review comment:
       Technically "InvalidRangeException" was just the name of the exception that could get thrown, there is no "InvalidBackwardRangeException" that I know of 😛  But I think the meaning is clear enough lol




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705564790


   cherry-picked to 2.7. Thanks @jeqo !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500658560



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -270,25 +335,32 @@ public void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long earliestSessionEndTime,
-                                     final long latestSessionStartTime) {
-            this(key, key, earliestSessionEndTime, latestSessionStartTime);
+                                     final long latestSessionStartTime,
+                                     final boolean forward) {
+            this(key, key, earliestSessionEndTime, latestSessionStartTime, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long earliestSessionEndTime,
-                                     final long latestSessionStartTime) {
+                                     final long latestSessionStartTime,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.latestSessionStartTime = latestSessionStartTime;
             this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
             this.segmentInterval = cacheFunction.getSegmentInterval();
+            this.forward = forward;
 
             this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime);

Review comment:
       Ok I _think_ that for the reverse case, this should be initialized to `cacheFunction.segmentId(maxObservedTimestamp)` and `lastSegmentId` should be initialized to this (`segmentId(earliestSessionEndTime)`). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-705311946


   Ok, I've pushed a couple of commits addressing my feedback.
   
   I absolutely share the desire to clean up bad formatting in the codebase, but when PRs are this extensive, I'd suggest eliminating absolutely all changes that aren't directly related to the change. The extra whitespace changes, etc., just add noise that makes it harder for reviewers to do their job. Plus, it increases the probability of merge conflicts.
   
   I'd encourage sending a separate PR applying formatting changes or other style fixes. This is what I do myself.
   
   Anyway, I rolled back the "extra" stuff. I also applied a couple of extra formatting changes to make the new code itself comply with the style guidelines.
   
   And it looks like Jenkins finally woke up, so I'll let it go ahead and finish running.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org