You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/06/29 16:22:51 UTC
[kafka] branch trunk updated: [9/N][Emit final] Emit final for session window aggregations (#12204)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ababc4261b [9/N][Emit final] Emit final for session window aggregations (#12204)
ababc4261b is described below
commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Jun 29 09:22:37 2022 -0700
[9/N][Emit final] Emit final for session window aggregations (#12204)
* Add a new API for session windows to range query session window by end time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/kstream/SessionWindowedKStream.java | 5 +-
.../kafka/streams/kstream/TimeWindowedKStream.java | 1 +
...bstractKStreamTimeWindowAggregateProcessor.java | 11 +-
.../internals/CogroupedStreamAggregateBuilder.java | 2 +
.../internals/KStreamSessionWindowAggregate.java | 272 ++++++++++++++++-----
.../kstream/internals/KStreamWindowAggregate.java | 7 -
.../kstream/internals/SessionTupleForwarder.java | 56 -----
.../internals/SessionWindowedKStreamImpl.java | 29 ++-
.../kstream/internals/TimeWindowedKStreamImpl.java | 55 +++--
.../internals/TimestampedTupleForwarder.java | 3 +-
.../internals/AbstractReadWriteDecorator.java | 6 +
.../apache/kafka/streams/state/SessionStore.java | 13 +
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 6 +-
.../internals/ChangeLoggingSessionBytesStore.java | 12 +-
.../state/internals/InMemorySessionStore.java | 21 +-
.../state/internals/MeteredSessionStore.java | 12 +
.../state/internals/PrefixedSessionKeySchemas.java | 13 +-
...cksDBTimeOrderedSessionSegmentedBytesStore.java | 33 ++-
.../internals/RocksDBTimeOrderedSessionStore.java | 7 +
.../streams/state/internals/SegmentIterator.java | 2 +-
.../state/internals/SegmentedBytesStore.java | 4 +-
.../streams/state/internals/SessionKeySchema.java | 2 +-
...KStreamSessionWindowAggregateProcessorTest.java | 219 ++++++++++++-----
.../internals/KStreamWindowAggregateTest.java | 2 +-
.../internals/SessionTupleForwarderTest.java | 108 --------
.../internals/SessionWindowedKStreamImplTest.java | 171 +++++++++----
.../internals/TimeWindowedKStreamImplTest.java | 2 +-
.../internals/graph/GraphGraceSearchUtilTest.java | 8 +-
28 files changed, 676 insertions(+), 406 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index c561b62abf..fe897515a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -39,7 +39,7 @@ import java.time.Duration;
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
- * New events are added to sessions until their grace period ends (see {@link SessionWindows#grace(Duration)}).
+ * New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
* <p>
* A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via
* {@link KGroupedStream#windowedBy(SessionWindows)}.
@@ -643,4 +643,7 @@ public interface SessionWindowedKStream<K, V> {
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Named named,
final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);
+
+ // TODO: add javadoc
+ SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 46ebd267f9..3f36838f20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -649,5 +649,6 @@ public interface TimeWindowedKStream<K, V> {
final Named named,
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
+ // TODO: add javadoc
TimeWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index d39dad1f79..a081a280ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -73,11 +73,10 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
final String threadId = Thread.currentThread().getName();
+ final String processorName = internalProcessorContext.currentNode().name();
droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
- emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(),
- internalProcessorContext.currentNode().name(), metrics);
- emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
- internalProcessorContext.currentNode().name(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics);
+ emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics);
windowStore = context.getStateStore(storeName);
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
@@ -175,7 +174,7 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
- private boolean shouldEmitFinal(final long closeTime) {
+ private boolean shouldEmitFinal(final long windowCloseTime) {
if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
return false;
}
@@ -192,7 +191,7 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
timeTracker.advanceNextTimeToEmit();
// Only EMIT if the window close time does progress
- return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < closeTime;
+ return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime;
}
private void fetchAndEmit(final Record<KIn, VIn> record,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 17dd413c45..3adc8beec8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -100,6 +100,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>(
windows,
storeBuilder.name(),
+ EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
@@ -138,6 +139,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>(
sessionWindows,
storeBuilder.name(),
+ EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue(),
sessionMerger);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index eff7ac327a..f8252358b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -18,8 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -29,6 +32,7 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -39,6 +43,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
@@ -50,16 +57,19 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
private final Merger<? super KIn, VAgg> sessionMerger;
+ private final EmitStrategy emitStrategy;
private boolean sendOldValues = false;
public KStreamSessionWindowAggregate(final SessionWindows windows,
- final String storeName,
- final Initializer<VAgg> initializer,
- final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
- final Merger<? super KIn, VAgg> sessionMerger) {
+ final String storeName,
+ final EmitStrategy emitStrategy,
+ final Initializer<VAgg> initializer,
+ final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
+ final Merger<? super KIn, VAgg> sessionMerger) {
this.windows = windows;
this.storeName = storeName;
+ this.emitStrategy = emitStrategy;
this.initializer = initializer;
this.aggregator = aggregator;
this.sessionMerger = sessionMerger;
@@ -83,24 +93,50 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
private SessionStore<KIn, VAgg> store;
- private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
+ private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
+ private Sensor emittedRecordsSensor;
+ private Sensor emitFinalLatencySensor;
+ private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext;
+
+ private final Time time = Time.SYSTEM;
+ protected final KStreamImplJoin.TimeTracker timeTracker = new KStreamImplJoin.TimeTracker();
@Override
public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
super.init(context);
+ internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
final String threadId = Thread.currentThread().getName();
- droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(),
- metrics);
+ final String processorName = internalProcessorContext.currentNode().name();
+ droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics);
+ emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics);
store = context.getStateStore(storeName);
- tupleForwarder = new SessionTupleForwarder<>(
- store,
- context,
- new SessionCacheFlushListener<>(context),
- sendOldValues
- );
+
+ if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ // Restore last emit close time for ON_WINDOW_CLOSE strategy
+ final Long lastEmitWindowCloseTime = internalProcessorContext.processorMetadataForKey(storeName);
+ if (lastEmitWindowCloseTime != null) {
+ this.lastEmitWindowCloseTime = lastEmitWindowCloseTime;
+ }
+ final long emitInterval = StreamsConfig.InternalConfig.getLong(
+ context.appConfigs(),
+ EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
+ 1000L
+ );
+ timeTracker.setEmitInterval(emitInterval);
+
+ tupleForwarder = new TimestampedTupleForwarder<>(context, sendOldValues);
+ } else {
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new SessionCacheFlushListener<>(context),
+ sendOldValues);
+ }
}
@Override
@@ -108,25 +144,13 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
// if the key is null, we do not need proceed aggregating
// the record with the table
if (record.key() == null) {
- if (context().recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata = context().recordMetadata().get();
- LOG.warn(
- "Skipping record due to null key. "
- + "topic=[{}] partition=[{}] offset=[{}]",
- recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
- );
- } else {
- LOG.warn(
- "Skipping record due to null key. Topic, partition, and offset not known."
- );
- }
- droppedRecordsSensor.record();
+ logSkippedRecordForNullKey();
return;
}
final long timestamp = record.timestamp();
observedStreamTime = Math.max(observedStreamTime, timestamp);
- final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
+ final long windowCloseTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>();
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
@@ -148,55 +172,174 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
}
}
- if (mergedWindow.end() < closeTime) {
- if (context().recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata = context().recordMetadata().get();
- LOG.warn(
- "Skipping record for expired window. " +
- "topic=[{}] " +
- "partition=[{}] " +
- "offset=[{}] " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- } else {
- LOG.warn(
- "Skipping record for expired window. Topic, partition, and offset not known. " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- }
- droppedRecordsSensor.record();
+ if (mergedWindow.end() < windowCloseTime) {
+ logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
} else {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
store.remove(session.key);
- tupleForwarder.maybeForward(
- record.withKey(session.key)
- .withValue(new Change<>(null, session.value)));
+
+ maybeForwardUpdate(session.key, session.value, null);
}
}
agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
store.put(sessionKey, agg);
+
+ maybeForwardUpdate(sessionKey, null, agg);
+ }
+
+ maybeForwardFinalResult(record, windowCloseTime);
+ }
+
+ private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+ final VAgg oldAgg,
+ final VAgg newAgg) {
+ if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ // Update the sent record timestamp to the window end time if possible
+ final long newTimestamp = windowedkey.window().end();
+ tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+ }
+
+ // TODO: consolidate SessionWindow with TimeWindow to merge common functions
+ private void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) {
+ if (shouldEmitFinal(windowCloseTime)) {
+ final long emitRangeUpperBound = emitRangeUpperBound(windowCloseTime);
+
+ // if the upper bound is smaller than 0, then there's no window closed ever;
+ // and we can skip range fetching
+ if (emitRangeUpperBound >= 0) {
+ final long emitRangeLowerBound = emitRangeLowerBound();
+
+ if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) {
+ fetchAndEmit(record, windowCloseTime, emitRangeLowerBound, emitRangeUpperBound);
+ }
+ }
+ }
+ }
+
+ private boolean shouldEmitFinal(final long windowCloseTime) {
+ if (emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ return false;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return false;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if system time jumps a lot,
+ // this can be triggered every time
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Only EMIT if the window close time does progress
+ return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime;
+ }
+
+ private long emitRangeLowerBound() {
+ return Math.max(0L, lastEmitWindowCloseTime);
+ }
+
+ private long emitRangeUpperBound(final long windowCloseTime) {
+ // Session window's start and end timestamps are inclusive, so
+ // we should minus 1 for the inclusive closed window-end upper bound
+ return windowCloseTime - 1;
+ }
+
+ private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) {
+ // since a session window could be a single point (i.e. [t, t]),
+ // we need to range fetch and emit even if the upper and lower bound are the same
+ return emitRangeUpperBound >= emitRangeLowerBound;
+ }
+
+ private void fetchAndEmit(final Record<KIn, VIn> record,
+ final long windowCloseTime,
+ final long emitRangeLowerBound,
+ final long emitRangeUpperBound) {
+ final long startMs = time.milliseconds();
+
+ // Only time ordered (indexed) session store should have implemented
+ // this function, otherwise a not-supported exception would throw
+ final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = store
+ .findSessions(emitRangeLowerBound, emitRangeUpperBound);
+
+ int emittedCount = 0;
+ while (windowToEmit.hasNext()) {
+ emittedCount++;
+ final KeyValue<Windowed<KIn>, VAgg> kv = windowToEmit.next();
+
tupleForwarder.maybeForward(
- record.withKey(sessionKey)
- .withValue(new Change<>(agg, null)));
+ record.withKey(kv.key)
+ .withValue(new Change<>(kv.value, null))
+ // set the timestamp as the window end timestamp
+ .withTimestamp(kv.key.window().end())
+ .withHeaders(record.headers()));
}
+ emittedRecordsSensor.record(emittedCount);
+ emitFinalLatencySensor.record(time.milliseconds() - startMs);
+
+ lastEmitWindowCloseTime = windowCloseTime;
+ internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime);
+ }
+
+ private void logSkippedRecordForNullKey() {
+ if (context().recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata = context().recordMetadata().get();
+ LOG.warn(
+ "Skipping record due to null key. "
+ + "topic=[{}] partition=[{}] offset=[{}]",
+ recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+ );
+ } else {
+ LOG.warn(
+ "Skipping record due to null key. Topic, partition, and offset not known."
+ );
+ }
+ droppedRecordsSensor.record();
+ }
+
+ private void logSkippedRecordForExpiredWindow(final long timestamp,
+ final long windowExpire,
+ final SessionWindow window) {
+ final String windowString = "[" + window.start() + "," + window.end() + "]";
+
+ if (context().recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata = context().recordMetadata().get();
+ LOG.warn("Skipping record for expired window. " +
+ "topic=[{}] " +
+ "partition=[{}] " +
+ "offset=[{}] " +
+ "timestamp=[{}] " +
+ "window={} " +
+ "expiration=[{}] " +
+ "streamTime=[{}]",
+ recordMetadata.topic(),
+ recordMetadata.partition(),
+ recordMetadata.offset(),
+ timestamp,
+ windowString,
+ windowExpire,
+ observedStreamTime
+ );
+ } else {
+ LOG.warn("Skipping record for expired window. Topic, partition, and offset not known. " +
+ "timestamp=[{}] " +
+ "window={} " +
+ "expiration=[{}] " +
+ "streamTime=[{}]",
+ timestamp,
+ windowString,
+ windowExpire,
+ observedStreamTime
+ );
+ }
+ droppedRecordsSensor.record();
}
}
@@ -237,5 +380,4 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
key.window().end());
}
}
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 81687cb9e0..561524f87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -49,13 +49,6 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
private boolean sendOldValues = false;
- public KStreamWindowAggregate(final Windows<W> windows,
- final String storeName,
- final Initializer<VAgg> initializer,
- final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
- this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator);
- }
-
public KStreamWindowAggregate(final Windows<W> windows,
final String storeName,
final EmitStrategy emitStrategy,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
deleted file mode 100644
index e1c302f875..0000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-
-/**
- * This class is used to determine if a processor should forward values to child nodes.
- * Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
- * forwarding occurs in the flush listener when the cached store flushes.
- *
- * @param <K>
- * @param <V>
- */
-class SessionTupleForwarder<K, V> {
- private final ProcessorContext<Windowed<K>, Change<V>> context;
- private final boolean sendOldValues;
- private final boolean cachingEnabled;
-
- @SuppressWarnings("unchecked")
- SessionTupleForwarder(final StateStore store,
- final ProcessorContext<Windowed<K>, Change<V>> context,
- final CacheFlushListener<Windowed<K>, V> flushListener,
- final boolean sendOldValues) {
- this.context = context;
- this.sendOldValues = sendOldValues;
- cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
- }
-
- public void maybeForward(final Record<Windowed<K>, Change<V>> record) {
- if (!cachingEnabled) {
- context.forward(
- record.withValue(new Change<>(record.value().newValue, sendOldValues ? record.value().oldValue : null))
- .withTimestamp(record.key() != null ? record.key().window().end() : record.timestamp()));
- }
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index f18c6ef568..c3b05cb118 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
import java.time.Duration;
import java.util.Objects;
@@ -48,6 +50,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
+ private EmitStrategy emitStrategy = EmitStrategy.onWindowUpdate();
+
SessionWindowedKStreamImpl(final SessionWindows windows,
final InternalStreamsBuilder builder,
final Set<String> subTopologySourceNodes,
@@ -90,6 +94,12 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
return doCount(named, materialized);
}
+ @Override
+ public SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) {
+ this.emitStrategy = emitStrategy;
+ return this;
+ }
+
private KTable<Windowed<K>, Long> doCount(final Named named,
final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal =
@@ -109,6 +119,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
+ emitStrategy,
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
countMerger),
@@ -157,6 +168,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
+ emitStrategy,
aggregateBuilder.reduceInitializer,
reduceAggregator,
mergerForAggregator(reduceAggregator)
@@ -214,6 +226,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
+ emitStrategy,
initializer,
aggregator,
sessionMerger),
@@ -246,10 +259,15 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
);
break;
case ROCKS_DB:
- supplier = Stores.persistentSessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
+ supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ materialized.storeName(),
+ retentionPeriod,
+ true) :
+ Stores.persistentSessionStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod)
+ );
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
@@ -268,7 +286,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
builder.withLoggingDisabled();
}
- if (materialized.cachingEnabled()) {
+ // do not enable cache if the emit final strategy is used
+ if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
builder.withCachingEnabled();
}
return builder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 1f2c732878..c07b783978 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -110,14 +110,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
-
-
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(
+ windows,
+ materializedInternal.storeName(),
+ emitStrategy,
+ aggregateBuilder.countInitializer,
+ aggregateBuilder.countAggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -158,14 +161,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
-
-
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(
+ windows,
+ materializedInternal.storeName(),
+ emitStrategy,
+ initializer,
+ aggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -205,12 +211,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
return aggregateBuilder.build(
- new NamedInternal(reduceName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(reduceName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(
+ windows,
+ materializedInternal.storeName(),
+ emitStrategy,
+ aggregateBuilder.reduceInitializer,
+ aggregatorForReducer(reducer)),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 34acbd99bd..bc686ada72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
/**
@@ -38,7 +39,7 @@ class TimestampedTupleForwarder<K, V> {
@SuppressWarnings({"unchecked", "rawtypes"})
TimestampedTupleForwarder(final StateStore store,
final ProcessorContext<K, Change<V>> context,
- final TimestampedCacheFlushListener<K, V> flushListener,
+ final CacheFlushListener<K, ?> flushListener,
final boolean sendOldValues) {
this.context = (InternalProcessorContext<K, Change<V>>) context;
this.sendOldValues = sendOldValues;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index aff099af5f..3c7f70ea07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -259,6 +259,12 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime);
+ }
+
@Override
public void remove(final Windowed<K> sessionKey) {
wrapped().remove(sessionKey);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index cbc1cc5b96..76a4317394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -39,6 +39,19 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {
+ /**
+ * Return all the session window entries that ends between the specified range (both ends are inclusive).
+ * This function would be used to retrieve all closed and immutable windows.
+ *
+ * @param earliestSessionEndTime earliest session end time to search from, inclusive
+ * @param latestSessionEndTime latest session end time to search to, inclusive
+ */
+ default KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ throw new UnsupportedOperationException(
+ "This API is not supported by this implementation of SessionStore.");
+ }
+
@Override
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final Instant earliestSessionEndTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index f7216412f0..0398f0ca06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -142,8 +142,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final long to,
final boolean forward) {
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to,
- forward);
+ final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from);
final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to);
@@ -156,8 +155,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
forward));
}
- final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to,
- forward);
+ final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward);
final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index ff387ef38e..fd32798801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -31,9 +31,9 @@ import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils
* Simple wrapper around a {@link SessionStore} to support writing
* updates to a changelog
*/
-class ChangeLoggingSessionBytesStore
- extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
- implements SessionStore<Bytes, byte[]> {
+public class ChangeLoggingSessionBytesStore
+ extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
+ implements SessionStore<Bytes, byte[]> {
private InternalProcessorContext context;
@@ -95,6 +95,12 @@ class ChangeLoggingSessionBytesStore
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
}
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime);
+ }
+
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) {
return wrapped().backwardFetch(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 97984dd156..579abc3678 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -202,25 +202,36 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public byte[] fetchSession(final Bytes key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
+ final long sessionStartTime,
+ final long sessionEndTime) {
removeExpiredSegments();
Objects.requireNonNull(key, "key cannot be null");
// Only need to search if the record hasn't expired yet
- if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
- final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+ if (sessionEndTime > observedStreamTime - retentionPeriod) {
+ final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(sessionEndTime);
if (keyMap != null) {
final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(key);
if (startTimeMap != null) {
- return startTimeMap.get(earliestSessionEndTime);
+ return startTimeMap.get(sessionStartTime);
}
}
}
return null;
}
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ removeExpiredSegments();
+
+ final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> endTimSubMap
+ = endTimeMap.subMap(earliestSessionEndTime, true, latestSessionEndTime, true);
+
+ return registerNewIterator(null, null, Long.MAX_VALUE, endTimSubMap.entrySet().iterator(), true);
+ }
+
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key,
final long earliestSessionEndTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index ad75e35e7a..bc4f2169b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -346,6 +346,18 @@ public class MeteredSessionStore<K, V>
time);
}
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ return new MeteredWindowedKeyValueIterator<>(
+ wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime),
+ fetchSensor,
+ streamsMetrics,
+ serdes::keyFrom,
+ serdes::valueFrom,
+ time);
+ }
+
@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
final K keyTo,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index c98ae83390..2ac25277ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -77,10 +77,8 @@ public class PrefixedSessionKeySchemas {
}
/**
- *
* @param key the key in the range
* @param to the latest start time
- * @return
*/
@Override
public Bytes upperRangeFixedSize(final Bytes key, final long to) {
@@ -88,10 +86,8 @@ public class PrefixedSessionKeySchemas {
}
/**
- *
* @param key the key in the range
* @param from the earliest end timestamp in the range
- * @return
*/
@Override
public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
@@ -105,7 +101,10 @@ public class PrefixedSessionKeySchemas {
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
- final Bytes binaryKeyTo, final long from, final long to, final boolean forward) {
+ final Bytes binaryKeyTo,
+ final long from,
+ final long to,
+ final boolean forward) {
return iterator -> {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
@@ -204,7 +203,9 @@ public class PrefixedSessionKeySchemas {
final long endTime) {
buf.putLong(endTime);
buf.putLong(startTime);
- buf.put(key.get());
+ if (key != null) {
+ buf.put(key.get());
+ }
}
public static Bytes toBinary(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 4265150eb9..172d321881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,7 +39,7 @@ import org.rocksdb.WriteBatch;
*/
public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore {
- private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator {
+ private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator {
SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, byte[]> indexIterator) {
super(indexIterator);
}
@@ -71,6 +72,36 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksD
));
}
+ public KeyValueIterator<Bytes, byte[]> fetchSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ final List<KeyValueSegment> searchSpace = segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
+
+ // here we want [0, latestSE, FF] as the upper bound to cover any possible keys,
+ // but since we can only get upper bound based on timestamps, we use a slight larger upper bound as [0, latestSE+1]
+ final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(null, earliestSessionEndTime);
+ final Bytes binaryTo = baseKeySchema.lowerRangeFixedSize(null, latestSessionEndTime + 1);
+
+ return new SegmentIterator<>(
+ searchSpace.iterator(),
+ iterator -> {
+ while (iterator.hasNext()) {
+ final Bytes bytes = iterator.peekNextKey();
+
+ final Windowed<Bytes> windowedKey = TimeFirstSessionKeySchema.from(bytes);
+ final long endTime = windowedKey.window().end();
+
+ if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime) {
+ return true;
+ }
+ iterator.next();
+ }
+ return false;
+ },
+ binaryFrom,
+ binaryTo,
+ true);
+ }
+
public void remove(final Windowed<Bytes> key) {
remove(TimeFirstSessionKeySchema.toBinary(key));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index 5b72163757..deb6028ef6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -61,6 +61,13 @@ public class RocksDBTimeOrderedSessionStore
);
}
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime,
+ final long latestSessionEndTime) {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchSessions(earliestSessionEndTime, latestSessionEndTime);
+ return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from);
+ }
+
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key,
final long earliestSessionEndTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 6191c49888..9aabc787c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -91,7 +91,7 @@ class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, byte
try {
hasNext = hasNextCondition.hasNext(currentIterator);
} catch (final InvalidStateStoreException e) {
- //already closed so ignore
+ // already closed so ignore
}
return hasNext;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 80b5a91ffa..1ef6a932f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
- * @param from the beginning of the time slot from which to search
- * @param to the end of the time slot from which to search
+ * @param from the beginning of the time slot from which to search (inclusive)
+ * @param to the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if null is used for any key
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 505bbddc80..f21e47fd87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -35,7 +35,7 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
public static int keyByteLength(final Bytes key) {
- return key.get().length + 2 * TIMESTAMP_SIZE;
+ return (key == null ? 0 : key.get().length) + 2 * TIMESTAMP_SIZE;
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 21c6e6af12..fc993b63a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -22,11 +22,11 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -40,9 +40,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
@@ -51,13 +53,18 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -69,29 +76,39 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
+@RunWith(Parameterized.class)
public class KStreamSessionWindowAggregateProcessorTest {
private static final long GAP_MS = 5 * 60 * 1000L;
private static final String STORE_NAME = "session-store";
+ private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics();
+ private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
- private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator =
- new KStreamSessionWindowAggregate<>(
- SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
- STORE_NAME,
- initializer,
- aggregator,
- sessionMerger);
-
private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<>();
- private final Processor<String, String, Windowed<String>, Change<Long>> processor = sessionAggregator.get();
- private SessionStore<String, Long> sessionStore;
+
private InternalMockProcessorContext<Windowed<String>, Change<Long>> context;
- private final Metrics metrics = new Metrics();
+ private KStreamSessionWindowAggregate<String, String, Long> sessionAggregator;
+ private Processor<String, String, Windowed<String>, Change<Long>> processor;
+ private SessionStore<String, Long> sessionStore;
+
+ @Parameterized.Parameter
+ public EmitStrategy.StrategyType type;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return asList(new Object[][] {
+ {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
+ {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
+ });
+ }
+
+ private EmitStrategy emitStrategy;
+ private boolean emitFinal;
@Before
public void setup() {
@@ -99,23 +116,44 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
private void setup(final boolean enableCache) {
- final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
+ // Always process
+ final Properties prop = StreamsTestUtils.getStreamsConfig();
+ prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0);
+ final StreamsConfig config = new StreamsConfig(prop);
+
context = new InternalMockProcessorContext<Windowed<String>, Change<Long>>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
streamsMetrics,
- new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ config,
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 100000, streamsMetrics),
- Time.SYSTEM
+ time
) {
@Override
public <K extends Windowed<String>, V extends Change<Long>> void forward(final Record<K, V> record) {
results.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp()));
}
};
+
+
+ emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+ emitStrategy = EmitStrategy.StrategyType.forType(type);
+
+ sessionAggregator = new KStreamSessionWindowAggregate<>(
+ SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
+ STORE_NAME,
+ emitStrategy,
+ initializer,
+ aggregator,
+ sessionMerger);
+
+ if (processor != null) {
+ processor.close();
+ }
+ processor = sessionAggregator.get();
+
// Set initial timestamp for CachingSessionStore to prepare entry from as default
// InternalMockProcessorContext#timestamp returns -1.
context.setTime(0L);
@@ -126,14 +164,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
private void initStore(final boolean enableCaching) {
- final StoreBuilder<SessionStore<String, Long>> storeBuilder =
- Stores.sessionStoreBuilder(
- Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)),
- Serdes.String(),
- Serdes.Long())
+ final SessionBytesStoreSupplier supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS * 3, true) :
+ Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
+
+ final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long())
.withLoggingDisabled();
- if (enableCaching) {
+ if (enableCaching && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
storeBuilder.withCachingEnabled();
}
@@ -147,6 +185,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@After
public void closeStore() {
sessionStore.close();
+ processor.close();
}
@Test
@@ -198,35 +237,51 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
final String sessionId = "mel";
- long time = 0;
- processor.process(new Record<>(sessionId, "first", time));
- final long time1 = time += GAP_MS + 1;
- processor.process(new Record<>(sessionId, "second", time1));
- processor.process(new Record<>(sessionId, "second", time1));
- final long time2 = time += GAP_MS + 1;
- processor.process(new Record<>(sessionId, "third", time2));
- processor.process(new Record<>(sessionId, "third", time2));
- processor.process(new Record<>(sessionId, "third", time2));
+ long now = 0;
+ processor.process(new Record<>(sessionId, "first", now));
+ now += GAP_MS + 1;
+ processor.process(new Record<>(sessionId, "second", now));
+ processor.process(new Record<>(sessionId, "second", now));
+ now += GAP_MS + 1;
+ processor.process(new Record<>(sessionId, "third", now));
+ processor.process(new Record<>(sessionId, "third", now));
+ processor.process(new Record<>(sessionId, "third", now));
sessionStore.flush();
- assertEquals(
- Arrays.asList(
- new KeyValueTimestamp<>(
- new Windowed<>(sessionId, new SessionWindow(0, 0)),
- new Change<>(1L, null),
- 0L),
- new KeyValueTimestamp<>(
- new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
- new Change<>(2L, null),
- GAP_MS + 1),
- new KeyValueTimestamp<>(
- new Windowed<>(sessionId, new SessionWindow(time, time)),
- new Change<>(3L, null),
- time)
- ),
- results
- );
+ if (emitFinal) {
+ assertEquals(
+ Arrays.asList(
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+ new Change<>(2L, null),
+ GAP_MS + 1)
+ ),
+ results
+ );
+ } else {
+ assertEquals(
+ Arrays.asList(
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+ new Change<>(2L, null),
+ GAP_MS + 1),
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(now, now)),
+ new Change<>(3L, null),
+ now)
+ ),
+ results
+ );
+ }
}
@Test
@@ -264,8 +319,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionStore.flush();
- assertEquals(
- Arrays.asList(
+ if (emitFinal) {
+ assertEquals(Arrays.asList(
new KeyValueTimestamp<>(
new Windowed<>("a", new SessionWindow(0, 0)),
new Change<>(1L, null),
@@ -281,22 +336,44 @@ public class KStreamSessionWindowAggregateProcessorTest {
new KeyValueTimestamp<>(
new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
new Change<>(2L, null),
- GAP_MS / 2),
- new KeyValueTimestamp<>(
- new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
- new Change<>(1L, null),
- GAP_MS + 1),
- new KeyValueTimestamp<>(
- new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)),
- new Change<>(2L, null),
- GAP_MS + 1 + GAP_MS / 2),
- new KeyValueTimestamp<>(new Windowed<>(
- "c",
- new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null),
- GAP_MS + 1 + GAP_MS / 2)
- ),
- results
- );
+ GAP_MS / 2)
+ ),
+ results);
+ } else {
+ assertEquals(
+ Arrays.asList(
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("b", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("c", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
+ new Change<>(2L, null),
+ GAP_MS / 2),
+ new KeyValueTimestamp<>(
+ new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+ new Change<>(1L, null),
+ GAP_MS + 1),
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)),
+ new Change<>(2L, null),
+ GAP_MS + 1 + GAP_MS / 2),
+ new KeyValueTimestamp<>(new Windowed<>(
+ "c",
+ new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null),
+ GAP_MS + 1 + GAP_MS / 2)
+ ),
+ results
+ );
+ }
}
@Test
@@ -314,6 +391,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
+ if (emitFinal)
+ return;
+
initStore(false);
processor.init(context);
@@ -342,6 +422,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
+ if (emitFinal)
+ return;
+
initStore(false);
processor.init(context);
@@ -399,6 +482,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)),
STORE_NAME,
+ EmitStrategy.onWindowUpdate(),
initializer,
aggregator,
sessionMerger
@@ -464,6 +548,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)),
STORE_NAME,
+ EmitStrategy.onWindowUpdate(),
initializer,
aggregator,
sessionMerger
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 484ad1f059..8af320ae70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -99,7 +99,7 @@ public class KStreamWindowAggregateTest {
@Parameter(1)
public boolean withCache;
- public EmitStrategy emitStrategy;
+ private EmitStrategy emitStrategy;
private boolean emitFinal;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
deleted file mode 100644
index 60b37bb052..0000000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-public class SessionTupleForwarderTest {
-
- @Test
- public void shouldSetFlushListenerOnWrappedStateStore() {
- setFlushListener(true);
- setFlushListener(false);
- }
-
- private void setFlushListener(final boolean sendOldValues) {
- final WrappedStateStore<StateStore, Windowed<Object>, Object> store = mock(WrappedStateStore.class);
- final SessionCacheFlushListener<Object, Object> flushListener = mock(SessionCacheFlushListener.class);
-
- expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
- replay(store);
-
- new SessionTupleForwarder<>(store, null, flushListener, sendOldValues);
-
- verify(store);
- }
-
- @Test
- public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
- shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
- shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
- }
-
- private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) {
- final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Windowed<String>, Change<String>> context = mock(
- ProcessorContext.class);
-
- expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
- if (sendOldValued) {
- context.forward(
- new Record<>(
- new Windowed<>("key", new SessionWindow(21L, 42L)),
- new Change<>("value", "oldValue"),
- 42L));
- } else {
- context.forward(
- new Record<>(
- new Windowed<>("key", new SessionWindow(21L, 42L)),
- new Change<>("value", null),
- 42L));
- }
- expectLastCall();
- replay(store, context);
-
- new SessionTupleForwarder<>(store, context, null, sendOldValued)
- .maybeForward(
- new Record<>(
- new Windowed<>("key", new SessionWindow(21L, 42L)),
- new Change<>("value", "oldValue"),
- 42L));
-
- verify(store, context);
- }
-
- @Test
- public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
- final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Windowed<String>, Change<String>> context = mock(ProcessorContext.class);
-
- expect(store.setFlushListener(null, false)).andReturn(true);
- replay(store, context);
-
- new SessionTupleForwarder<>(store, context, null, false)
- .maybeForward(
- new Record<>(
- new Windowed<>("key", new SessionWindow(21L, 42L)),
- new Change<>("value", "oldValue"),
- 42L));
-
- verify(store, context);
- }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index a77dcdb0a2..41876581b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -34,8 +36,11 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
+import org.apache.kafka.streams.state.internals.MeteredSessionStore;
+import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
@@ -43,29 +48,58 @@ import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+@RunWith(Parameterized.class)
public class SessionWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final Merger<String, String> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + "+" + aggTwo;
+
private SessionWindowedKStream<String, String> stream;
+ @Parameterized.Parameter
+ public EmitStrategy.StrategyType type;
+
+ private boolean emitFinal;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return asList(new Object[][] {
+ {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
+ {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
+ });
+ }
+
@Before
public void before() {
+ final EmitStrategy emitStrategy = EmitStrategy.StrategyType.forType(type);
+ emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+
+ // Set interval to 0 so that it always tries to emit
+ props.setProperty(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, "0");
+
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)));
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
+ .emitStrategy(emitStrategy);
}
@Test
@@ -89,19 +123,30 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
}
- final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
- assertThat(result.size(), equalTo(3));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(10L, 15L))),
- equalTo(ValueAndTimestamp.make(2L, 15L)));
- assertThat(
- result.get(new Windowed<>("2", new SessionWindow(599L, 600L))),
- equalTo(ValueAndTimestamp.make(2L, 600L)));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(600L, 600L))),
- equalTo(ValueAndTimestamp.make(1L, 600L)));
+ final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed =
+ supplier.theCapturedProcessor().processed();
+
+ if (emitFinal) {
+ assertEquals(
+ Collections.singletonList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), 2L, 15L)
+ ),
+ processed
+ );
+ } else {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), 1L, 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), 2L, 15L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), 1L, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), 1L, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), 2L, 600L)
+ ),
+ processed
+ );
+ }
}
@Test
@@ -115,19 +160,30 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
}
- final Map<Windowed<String>, ValueAndTimestamp<String>> result =
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
- assertThat(result.size(), equalTo(3));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(10, 15))),
- equalTo(ValueAndTimestamp.make("1+2", 15L)));
- assertThat(
- result.get(new Windowed<>("2", new SessionWindow(599L, 600))),
- equalTo(ValueAndTimestamp.make("1+2", 600L)));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(600, 600))),
- equalTo(ValueAndTimestamp.make("3", 600L)));
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed =
+ supplier.theCapturedProcessor().processed();
+
+ if (emitFinal) {
+ assertEquals(
+ Collections.singletonList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "1+2", 15L)
+ ),
+ processed
+ );
+ } else {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), "1", 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "1+2", 15L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), "3", 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), "1", 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), "1+2", 600L)
+ ),
+ processed
+ );
+ }
}
@Test
@@ -143,19 +199,30 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
}
- final Map<Windowed<String>, ValueAndTimestamp<String>> result =
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
- assertThat(result.size(), equalTo(3));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(10, 15))),
- equalTo(ValueAndTimestamp.make("0+0+1+2", 15L)));
- assertThat(
- result.get(new Windowed<>("2", new SessionWindow(599, 600))),
- equalTo(ValueAndTimestamp.make("0+0+1+2", 600L)));
- assertThat(
- result.get(new Windowed<>("1", new SessionWindow(600, 600))),
- equalTo(ValueAndTimestamp.make("0+3", 600L)));
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed =
+ supplier.theCapturedProcessor().processed();
+
+ if (emitFinal) {
+ assertEquals(
+ Collections.singletonList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "0+0+1+2", 15L)
+ ),
+ processed
+ );
+ } else {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), "0+1", 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "0+0+1+2", 15L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), "0+3", 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), "0+1", 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), "0+0+1+2", 600L)
+ ),
+ processed
+ );
+ }
}
@Test
@@ -292,6 +359,26 @@ public class SessionWindowedKStreamImplTest {
assertThrows(NullPointerException.class, () -> stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null));
}
+ @Test
+ public void shouldNotEnableCachingWithEmitFinal() {
+ if (!emitFinal)
+ return;
+
+ stream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ sessionMerger,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated").withValueSerde(Serdes.String()));
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final SessionStore<String, String> store = driver.getSessionStore("aggregated");
+ final WrappedStateStore changeLogging = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
+ assertThat(store, instanceOf(MeteredSessionStore.class));
+ assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class));
+ assertThat(changeLogging.wrapped(), instanceOf(RocksDBTimeOrderedSessionStore.class));
+ }
+ }
+
private void processData(final TopologyTestDriver driver) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 82017317c8..5ac43ac808 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -85,7 +85,7 @@ public class TimeWindowedKStreamImplTest {
private boolean emitFinal;
@Parameterized.Parameters(name = "{0}_cache:{1}")
- public static Collection<Object[]> getKeySchema() {
+ public static Collection<Object[]> data() {
return asList(new Object[][] {
{StrategyType.ON_WINDOW_UPDATE, true},
{StrategyType.ON_WINDOW_UPDATE, false},
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index f8a7073dab..1de78a8b85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
@@ -86,6 +87,7 @@ public class GraphGraceSearchUtilTest {
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
windows,
"asdf",
+ EmitStrategy.onWindowUpdate(),
null,
null
),
@@ -108,6 +110,7 @@ public class GraphGraceSearchUtilTest {
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
"asdf",
+ EmitStrategy.onWindowUpdate(),
null,
null,
null
@@ -127,7 +130,7 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
- windows, "asdf", null, null, null
+ windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, null
), "asdf"),
(StoreBuilder<?>) null
);
@@ -167,6 +170,7 @@ public class GraphGraceSearchUtilTest {
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
"asdf",
+ EmitStrategy.onWindowUpdate(),
null,
null,
null
@@ -194,6 +198,7 @@ public class GraphGraceSearchUtilTest {
new KStreamSessionWindowAggregate<String, Long, Integer>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)),
"asdf",
+ EmitStrategy.onWindowUpdate(),
null,
null,
null
@@ -209,6 +214,7 @@ public class GraphGraceSearchUtilTest {
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
"asdf",
+ EmitStrategy.onWindowUpdate(),
null,
null
),