You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/04 05:09:35 UTC

[kafka] branch 3.3 updated: KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new abb2fab4898 KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
abb2fab4898 is described below

commit abb2fab489896b52a1404b2ddb65c88374fcc155
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Tue Apr 4 00:29:40 2023 -0400

    KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 ...bstractKStreamTimeWindowAggregateProcessor.java | 31 +++++++++++-----------
 .../internals/KStreamSlidingWindowAggregate.java   |  9 ++++---
 2 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index a081a280baf..dfef0d4aaae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -200,22 +200,23 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
                               final long emitRangeUpperBound) {
         final long startMs = time.milliseconds();
 
-        final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit = windowStore
-            .fetchAll(emitRangeLowerBound, emitRangeUpperBound);
-
-        int emittedCount = 0;
-        while (windowToEmit.hasNext()) {
-            emittedCount++;
-            final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
-
-            tupleForwarder.maybeForward(
-                record.withKey(kv.key)
-                    .withValue(new Change<>(kv.value.value(), null))
-                    .withTimestamp(kv.value.timestamp())
-                    .withHeaders(record.headers()));
+        try (final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit
+                 = windowStore.fetchAll(emitRangeLowerBound, emitRangeUpperBound)) {
+
+            int emittedCount = 0;
+            while (windowToEmit.hasNext()) {
+                emittedCount++;
+                final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
+
+                tupleForwarder.maybeForward(
+                    record.withKey(kv.key)
+                        .withValue(new Change<>(kv.value.value(), null))
+                        .withTimestamp(kv.value.timestamp())
+                        .withHeaders(record.headers()));
+            }
+            emittedRecordsSensor.record(emittedCount);
+            emitFinalLatencySensor.record(time.milliseconds() - startMs);
         }
-        emittedRecordsSensor.record(emittedCount);
-        emitFinalLatencySensor.record(time.milliseconds() - startMs);
 
         lastEmitWindowCloseTime = windowCloseTime;
         internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index aa0841a38f0..e75427d6b89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashSet;
@@ -122,9 +123,11 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
 
             if (reverseIteratorPossible == null) {
                 try {
-                    windowStore.backwardFetch(record.key(), 0L, 0L);
-                    reverseIteratorPossible = true;
-                    log.debug("Sliding Windows aggregate using a reverse iterator");
+                    try (final WindowStoreIterator<ValueAndTimestamp<VAgg>> iterator
+                             = windowStore.backwardFetch(record.key(), 0L, 0L)) {
+                        reverseIteratorPossible = true;
+                        log.debug("Sliding Windows aggregate using a reverse iterator");
+                    }
                 } catch (final UnsupportedOperationException e)  {
                     reverseIteratorPossible = false;
                     log.debug("Sliding Windows aggregate using a forward iterator");