You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/18 07:20:54 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r730618511



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -239,9 +254,31 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long
         );
     }
 
+    private long getObservedStreamTime(final StateStore stateStore) {
+        if (stateStore instanceof PersistentSessionStore) {
+            return ((PersistentSessionStore) stateStore).getObservedStreamTime();
+        } else if (stateStore instanceof WrappedStateStore) {
+            return getObservedStreamTime(((WrappedStateStore) stateStore).wrapped());
+        } else {
+            return ConsumerRecord.NO_TIMESTAMP;
+        }
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+
+        if (wrapped().persistent()) {
+            final long actualEarliestSessionEndTime = getActualEarliestSessionEndTime(wrapped(), 0);
+            return new MeteredWindowedKeyValueIterator<>(
+                    wrapped().findSessions(
+                            keyBytes(key), actualEarliestSessionEndTime, Long.MAX_VALUE),
+                    fetchSensor,
+                    streamsMetrics,
+                    serdes,
+                    time);
+        }
+
         return new MeteredWindowedKeyValueIterator<>(

Review comment:
       I know in this issue, we only want to improve the non-inMemory window stores, but do you think we can make them return the `MeteredWindowedKeyValueIterator` with `actualEarliestSessionEndTime` ? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -239,9 +254,31 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long
         );
     }
 
+    private long getObservedStreamTime(final StateStore stateStore) {
+        if (stateStore instanceof PersistentSessionStore) {
+            return ((PersistentSessionStore) stateStore).getObservedStreamTime();
+        } else if (stateStore instanceof WrappedStateStore) {
+            return getObservedStreamTime(((WrappedStateStore) stateStore).wrapped());
+        } else {
+            return ConsumerRecord.NO_TIMESTAMP;
+        }
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+
+        if (wrapped().persistent()) {
+            final long actualEarliestSessionEndTime = getActualEarliestSessionEndTime(wrapped(), 0);
+            return new MeteredWindowedKeyValueIterator<>(
+                    wrapped().findSessions(
+                            keyBytes(key), actualEarliestSessionEndTime, Long.MAX_VALUE),
+                    fetchSensor,
+                    streamsMetrics,
+                    serdes,
+                    time);
+        }
+
         return new MeteredWindowedKeyValueIterator<>(

Review comment:
       And same comments to below similar places.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
##########
@@ -60,7 +103,8 @@ public void shouldRemoveExpired() {
         try (final KeyValueIterator<Windowed<String>, Long> iterator =
             sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
         ) {
-            assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L)));
+            assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L)));

Review comment:
       Thanks for the comments added. 
   So, the comment in line 100 is not valid anymore, (or should be updated), right?
   // Advance stream time to expire the first record

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -201,6 +205,9 @@ public V fetch(final K key,
         Objects.requireNonNull(key, "key cannot be null");
         return maybeMeasureLatency(
             () -> {
+                if (wrapped().persistent() && timestamp <= getObservedStreamTime(wrapped()) - retentionPeriod) {
+                    return null;

Review comment:
       I think `wrapped().persistent()` can be removed. This logic should apply to whole window stores, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org