You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/08 02:38:36 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

vvcephei commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501351457



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -382,9 +479,23 @@ private boolean setInnerIterators() {
                 currentKey = nextKeyEntry.getKey();
 
                 if (latestSessionStartTime == Long.MAX_VALUE) {
-                    recordIterator = nextKeyEntry.getValue().entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = nextKeyEntry.getValue().descendingMap().entrySet();
+                    else entries = nextKeyEntry.getValue().entrySet();

Review comment:
       The code style discourages inline conditionals. It's more maintainable to always use blocks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
##########
@@ -24,35 +24,156 @@
  * Implementations should be thread-safe as concurrent reads and writes
  * are expected.
  *
- * @param <K> the key type
+ * @param <K>   the key type
  * @param <AGG> the aggregated value type
  */
 public interface ReadOnlySessionStore<K, AGG> {
+
     /**
-     * Retrieve all aggregated sessions for the provided key.
+     * Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
+     * start is &le; latestSessionStartTime iterating from earliest to latest.
+     * <p>
      * This iterator must be closed after use.
      *
+     * @param key                    the key to return sessions for
+     * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts.
+     * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends.
+     * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time.
+     * @throws NullPointerException If null is used for key.
+     */
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                            final long earliestSessionEndTime,
+                                                            final long latestSessionStartTime) {
+        throw new UnsupportedOperationException("Moved from SessionStore");

Review comment:
       It won't matter to users whether this method was moved from another interface or not. They just need to know why they're getting the exception. I.e., we just need to tell them that the store implementation they selected didn't implement the method.
   
   ```suggestion
           throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore.");
   ```
   
   We should say the exact same thing in all default implementations. Right now, they're a bit inconsistent.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -150,22 +149,44 @@ public void remove(final Windowed<Bytes> sessionKey) {
         validateStoreOpen();
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ?
-            new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) :
+            new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, true) :
             context.cache().range(cacheName,
-                        cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)),
-                        cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime))
+                cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)),
+                cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime))

Review comment:
       There are a lot of unnecessary whitespace changes in this PR. You don't need to back them all out right now, but in the future, please clean up the diff before submitting a PR. These extra changes make it harder to review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org