You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/10 08:49:34 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

ableegoldman commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r706010750



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP)
+            return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod + 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = allWindowedKeyValueIterator.next();
+            if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time);
+    }

Review comment:
       >  it's failing for test cases like shouldNotThrowConcurrentModificationException . This seems to be because the put() call while iterating....
   
   You don't mean that the test itself is failing, just that the filter isn't being applied to the `put` records, right?
   > ...the put() call while iterating is appending to the wrapped instance of iterator and hence it's not visible
   
   Can you also expand a bit on what you mean by this? Not sure I have the whole picture here since I haven't been following this too closely, thanks in advance for catching me up 🙂  
   
   > Looking at this, do you think it would be a good idea to move this logic in the actual RocksDB implementations? Or do you think there's a better way to do it here in MeteredStore class itself?
   
   Personally I think the benefits of keeping this in the metering layer outweigh any downsides, especially if it comes down to just some weird edge case(s) that don't play nicely with the filtering. But I'm also interested in hearing what @showuon makes of this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org