You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/04 04:29:50 UTC
[kafka] branch trunk updated: KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new babfb1778b1 KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
babfb1778b1 is described below
commit babfb1778b1fd57d86261adab72ee42bc04caa8b
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Tue Apr 4 00:29:40 2023 -0400
KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
...bstractKStreamTimeWindowAggregateProcessor.java | 31 +++++++++++-----------
.../internals/KStreamSlidingWindowAggregate.java | 9 ++++---
2 files changed, 22 insertions(+), 18 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index a081a280baf..dfef0d4aaae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -200,22 +200,23 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
final long emitRangeUpperBound) {
final long startMs = time.milliseconds();
- final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit = windowStore
- .fetchAll(emitRangeLowerBound, emitRangeUpperBound);
-
- int emittedCount = 0;
- while (windowToEmit.hasNext()) {
- emittedCount++;
- final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
-
- tupleForwarder.maybeForward(
- record.withKey(kv.key)
- .withValue(new Change<>(kv.value.value(), null))
- .withTimestamp(kv.value.timestamp())
- .withHeaders(record.headers()));
+ try (final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit
+ = windowStore.fetchAll(emitRangeLowerBound, emitRangeUpperBound)) {
+
+ int emittedCount = 0;
+ while (windowToEmit.hasNext()) {
+ emittedCount++;
+ final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
+
+ tupleForwarder.maybeForward(
+ record.withKey(kv.key)
+ .withValue(new Change<>(kv.value.value(), null))
+ .withTimestamp(kv.value.timestamp())
+ .withHeaders(record.headers()));
+ }
+ emittedRecordsSensor.record(emittedCount);
+ emitFinalLatencySensor.record(time.milliseconds() - startMs);
}
- emittedRecordsSensor.record(emittedCount);
- emitFinalLatencySensor.record(time.milliseconds() - startMs);
lastEmitWindowCloseTime = windowCloseTime;
internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index aa0841a38f0..e75427d6b89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
@@ -122,9 +123,11 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
if (reverseIteratorPossible == null) {
try {
- windowStore.backwardFetch(record.key(), 0L, 0L);
- reverseIteratorPossible = true;
- log.debug("Sliding Windows aggregate using a reverse iterator");
+ try (final WindowStoreIterator<ValueAndTimestamp<VAgg>> iterator
+ = windowStore.backwardFetch(record.key(), 0L, 0L)) {
+ reverseIteratorPossible = true;
+ log.debug("Sliding Windows aggregate using a reverse iterator");
+ }
} catch (final UnsupportedOperationException e) {
reverseIteratorPossible = false;
log.debug("Sliding Windows aggregate using a forward iterator");