You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/04 07:51:00 UTC

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11234: KAFKA-13212: add support infinite query for session store

patrickstuedi commented on a change in pull request #11234:
URL: https://github.com/apache/kafka/pull/11234#discussion_r720243284



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -212,18 +216,15 @@ public void remove(final Windowed<Bytes> sessionKey) {
                                                                   final Bytes keyTo,
                                                                   final long earliestSessionEndTime,
                                                                   final long latestSessionStartTime) {
-        if (keyFrom.compareTo(keyTo) > 0) {
-            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
-                "This may be due to range arguments set in the wrong order, " +
-                "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
-                "Note that the built-in numerical serdes do not follow this for negative numbers");
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn(INVALID_RANGE_WARN_MSG);
             return KeyValueIterators.emptyIterator();
         }
 
         validateStoreOpen();
 
-        final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));
-        final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime));
+        final Bytes cacheKeyFrom = keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));

Review comment:
       Just curious, would it make sense to make cacheFunction deal with null keys (and return null in that case), what do you think?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -351,10 +345,8 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long
                                                                  final K keyTo,
                                                                  final long earliestSessionEndTime,
                                                                  final long latestSessionStartTime) {
-        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
-        Objects.requireNonNull(keyTo, "keyTo cannot be null");
-        final Bytes bytesKeyFrom = keyBytes(keyFrom);
-        final Bytes bytesKeyTo = keyBytes(keyTo);
+        final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom);

Review comment:
       Similarly here, would it make sense to integrate that check into keyBytes? I think there are similar cases in other stores.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org