You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/31 22:23:41 UTC
[kafka] branch trunk updated: KAFKA-5636: SlidingWindows (KIP-450)
(#9039)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 85b6545 KAFKA-5636: SlidingWindows (KIP-450) (#9039)
85b6545 is described below
commit 85b6545b8159885c57ab67e08b7185be8a607988
Author: leah <lt...@confluent.io>
AuthorDate: Mon Aug 31 17:22:00 2020 -0500
KAFKA-5636: SlidingWindows (KIP-450) (#9039)
Add SlidingWindows API, implementation, and tests.
An edge case and an optimization are left to follow-on work.
Implements: KIP-450
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <mj...@apache.org>, John Roesler <vv...@apache.org>
---
.../kafka/streams/kstream/CogroupedKStream.java | 9 +
.../kafka/streams/kstream/KGroupedStream.java | 7 +
.../kafka/streams/kstream/SlidingWindows.java | 140 +++++
.../kstream/internals/CogroupedKStreamImpl.java | 14 +
.../internals/CogroupedStreamAggregateBuilder.java | 75 ++-
.../kstream/internals/KGroupedStreamImpl.java | 16 +
.../internals/KStreamSlidingWindowAggregate.java | 307 +++++++++
.../SessionWindowedCogroupedKStreamImpl.java | 42 +-
...va => SlidingWindowedCogroupedKStreamImpl.java} | 118 ++--
...amImpl.java => SlidingWindowedKStreamImpl.java} | 144 ++---
.../TimeWindowedCogroupedKStreamImpl.java | 52 +-
.../kstream/internals/TimeWindowedKStreamImpl.java | 45 +-
.../internals/graph/GraphGraceSearchUtil.java | 6 +
.../KStreamAggregationIntegrationTest.java | 204 ++++++
.../kafka/streams/kstream/SlidingWindowsTest.java | 82 +++
.../internals/CogroupedKStreamImplTest.java | 55 +-
.../kstream/internals/KGroupedStreamImplTest.java | 317 +++++++---
.../KStreamSlidingWindowAggregateTest.java | 692 +++++++++++++++++++++
.../SlidingWindowedCogroupedKStreamImplTest.java | 249 ++++++++
.../internals/SlidingWindowedKStreamImplTest.java | 439 +++++++++++++
.../kstream/internals/SuppressScenarioTest.java | 84 +++
21 files changed, 2721 insertions(+), 376 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 35bc8de..616b3b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -276,6 +276,15 @@ public interface CogroupedKStream<K, VOut> {
<W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);
/**
+ * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
+ * windowed aggregations.
+ *
+ * @param windows the specification of the aggregation {@link SlidingWindows}
+ * @return an instance of {@link TimeWindowedCogroupedKStream}
+ */
+ TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows);
+
+ /**
* Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session
* windowed aggregations.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 56faea0..d805f8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -530,6 +530,13 @@ public interface KGroupedStream<K, V> {
<W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
/**
+ * Create a new {@link TimeWindowedKStream} instance that can be used to perform sliding windowed aggregations.
+ * @param windows the specification of the aggregation {@link SlidingWindows}
+ * @return an instance of {@link TimeWindowedKStream}
+ */
+ TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);
+
+ /**
* Create a new {@link SessionWindowedKStream} instance that can be used to perform session windowed aggregations.
* @param windows the specification of the aggregation {@link SessionWindows}
* @return an instance of {@link TimeWindowedKStream}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
new file mode 100644
index 0000000..285c7c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between
+ * records in the same window, and the given window grace period. While the window is sliding over the input data stream, a new window is
+ * created each time a record enters the sliding window or a record drops out of the sliding window.
+ * <p>
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * | key | value | time |
+ * +-----------+-------------+------------+
+ * | A | 1 | 8000 |
+ * +-----------+-------------+------------+
+ * | A | 2 | 9200 |
+ * +-----------+-------------+------------+
+ * | A | 3 | 12400 |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * <ul>
+ * <li>window {@code [3000;8000]} contains [1] (created when first record enters the window)</li>
+ * <li>window {@code [4200;9200]} contains [1,2] (created when second record enters the window)</li>
+ * <li>window {@code [7400;12400]} contains [1,2,3] (created when third record enters the window)</li>
+ * <li>window {@code [8001;13001]} contains [2,3] (created when the first record drops out of the window)</li>
+ * <li>window {@code [9201;14201]} contains [3] (created when the second record drops out of the window)</li>
+ * </ul>
+ *<p>
+ * Note that while SlidingWindows are of a fixed size, as are {@link TimeWindows}, the start and end points of the window
+ * depend on when events occur in the stream (i.e., event timestamps), similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(SlidingWindows)
+ * @see CogroupedKStream#windowedBy(SlidingWindows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+ /** The size of the windows in milliseconds, defined by the max time difference between records. */
+ private final long timeDifferenceMs;
+
+ /** The grace period in milliseconds. */
+ private final long graceMs;
+
+ private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
+ this.timeDifferenceMs = timeDifferenceMs;
+ this.graceMs = graceMs;
+ }
+
+ /**
+ * Return a window definition with the window size based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period. Reject out-of-order events that arrive after {@code grace}.
+ * A window is closed when {@code stream-time > window-end + grace-period}.
+ *
+ * @param timeDifference the max time difference (inclusive) between two records in a window
+ * @param grace the grace period to admit out-of-order events to a window
+ * @return a new window definition
+ * @throws IllegalArgumentException if the specified window size is < 0 or grace < 0, or either can't be represented as {@code long milliseconds}
+ */
+ public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
+ final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+ if (timeDifferenceMs < 0) {
+ throw new IllegalArgumentException("Window time difference must not be negative.");
+ }
+ final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace");
+ final long graceMs = ApiUtils.validateMillisecondDuration(grace, msgPrefixGrace);
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Window grace period must not be negative.");
+ }
+ return new SlidingWindows(timeDifferenceMs, graceMs);
+ }
+
+ public long timeDifferenceMs() {
+ return timeDifferenceMs;
+ }
+
+ public long gracePeriodMs() {
+ return graceMs;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SlidingWindows that = (SlidingWindows) o;
+ return timeDifferenceMs == that.timeDifferenceMs &&
+ graceMs == that.graceMs;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timeDifferenceMs, graceMs);
+ }
+
+ @Override
+ public String toString() {
+ return "SlidingWindows{" +
+ ", sizeMs=" + timeDifferenceMs +
+ ", graceMs=" + graceMs +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
index 86ef133..ce15e7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
@@ -108,6 +109,19 @@ public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> imple
}
@Override
+ public TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows slidingWindows) {
+ Objects.requireNonNull(slidingWindows, "slidingWindows can't be null");
+ return new SlidingWindowedCogroupedKStreamImpl<>(
+ slidingWindows,
+ builder,
+ subTopologySourceNodes,
+ name,
+ aggregateBuilder,
+ streamsGraphNode,
+ groupPatterns);
+ }
+
+ @Override
public SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows sessionWindows) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
return new SessionWindowedCogroupedKStreamImpl<>(sessionWindows,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 15051dd..03ef69d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
@@ -60,13 +61,13 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
- named.suffixWithOrElseGet(
- "-cogroup-agg-" + counter++,
- builder,
- CogroupedKStreamImpl.AGGREGATE_NAME),
- stateCreated,
- storeBuilder,
- new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+ named.suffixWithOrElseGet(
+ "-cogroup-agg-" + counter++,
+ builder,
+ CogroupedKStreamImpl.AGGREGATE_NAME),
+ stateCreated,
+ storeBuilder,
+ new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
@@ -132,6 +133,34 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
return createTable(processors, named, keySerde, valueSerde, queryableName);
}
+ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+ final Initializer<VOut> initializer,
+ final NamedInternal named,
+ final StoreBuilder<?> storeBuilder,
+ final Serde<KR> keySerde,
+ final Serde<VOut> valueSerde,
+ final String queryableName,
+ final SlidingWindows slidingWindows) {
+ processRepartitions(groupPatterns, storeBuilder);
+ final Collection<StreamsGraphNode> processors = new ArrayList<>();
+ boolean stateCreated = false;
+ int counter = 0;
+ for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+ final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+ named.suffixWithOrElseGet(
+ "-cogroup-agg-" + counter++,
+ builder,
+ CogroupedKStreamImpl.AGGREGATE_NAME),
+ stateCreated,
+ storeBuilder,
+ new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+ stateCreated = true;
+ processors.add(statefulProcessorNode);
+ builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+ }
+ return createTable(processors, named, keySerde, valueSerde, queryableName);
+ }
+
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final StoreBuilder<?> storeBuilder) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
@@ -174,7 +203,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
CogroupedKStreamImpl.MERGE_NAME);
final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
final ProcessorGraphNode<K, VOut> mergeNode =
- new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+ new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
builder.addGraphNode(processors, mergeNode);
@@ -196,18 +225,18 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
- new StatefulProcessorNode<>(
- processorName,
- new ProcessorParameters<>(kStreamAggregate, processorName),
- storeBuilder
- );
+ new StatefulProcessorNode<>(
+ processorName,
+ new ProcessorParameters<>(kStreamAggregate, processorName),
+ storeBuilder
+ );
} else {
statefulProcessorNode =
- new StatefulProcessorNode<>(
- processorName,
- new ProcessorParameters<>(kStreamAggregate, processorName),
- new String[]{storeBuilder.name()}
- );
+ new StatefulProcessorNode<>(
+ processorName,
+ new ProcessorParameters<>(kStreamAggregate, processorName),
+ new String[]{storeBuilder.name()}
+ );
}
return statefulProcessorNode;
@@ -220,11 +249,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<?> valueSerde) {
KStreamImpl.createRepartitionedSource(builder,
- keySerde,
- (Serde<VIn>) valueSerde,
- repartitionTopicNamePrefix,
- null,
- (OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
+ keySerde,
+ (Serde<VIn>) valueSerde,
+ repartitionTopicNamePrefix,
+ null,
+ (OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index f8061ca..6c5cfc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
@@ -203,6 +204,21 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
}
@Override
+ public TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows) {
+
+ return new SlidingWindowedKStreamImpl<>(
+ windows,
+ builder,
+ subTopologySourceNodes,
+ name,
+ keySerde,
+ valueSerde,
+ aggregateBuilder,
+ streamsGraphNode
+ );
+ }
+
+ @Override
public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
return new SessionWindowedKStreamImpl<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
new file mode 100644
index 0000000..3ac58c3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final String storeName;
+ private final SlidingWindows windows;
+ private final Initializer<Agg> initializer;
+ private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+ private boolean sendOldValues = false;
+
+ public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer<Agg> initializer,
+ final Aggregator<? super K, ? super V, Agg> aggregator) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.initializer = initializer;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamSlidingWindowAggregateProcessor();
+ }
+
+ public SlidingWindows windows() {
+ return windows;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+ private TimestampedWindowStore<K, Agg> windowStore;
+ private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+ private StreamsMetricsImpl metrics;
+ private InternalProcessorContext internalProcessorContext;
+ private Sensor lateRecordDropSensor;
+ private Sensor droppedRecordsSensor;
+ private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ internalProcessorContext = (InternalProcessorContext) context;
+ metrics = internalProcessorContext.metrics();
+ final String threadId = Thread.currentThread().getName();
+ lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+ threadId,
+ context.taskId().toString(),
+ internalProcessorContext.currentNode().name(),
+ metrics
+ );
+ droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+ windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ if (key == null || value == null) {
+ log.warn(
+ "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ droppedRecordsSensor.record();
+ return;
+ }
+
+ final long timestamp = context().timestamp();
+ //don't process records that don't fall within a full sliding window
+ if (timestamp < windows.timeDifferenceMs()) {
+ log.warn(
+ "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ droppedRecordsSensor.record();
+ return;
+ }
+ processInOrder(key, value, timestamp);
+ }
+
+ public void processInOrder(final K key, final V value, final long timestamp) {
+
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+ //store start times of windows we find
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ // aggregate that will go in the current record’s left/right window (if needed)
+ ValueAndTimestamp<Agg> leftWinAgg = null;
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+
+ //if current record's left/right windows already exist
+ boolean leftWinAlreadyCreated = false;
+ boolean rightWinAlreadyCreated = false;
+
+ // keep the left type window closest to the record
+ Window latestLeftTypeWindow = null;
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+ key,
+ key,
+ timestamp - 2 * windows.timeDifferenceMs(),
+ // to catch the current record's right window, if it exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long endTime = startTime + windows.timeDifferenceMs();
+
+ if (endTime < timestamp) {
+ leftWinAgg = next.value;
+ if (isLeftWindow(next)) {
+ latestLeftTypeWindow = next.key.window();
+ }
+ } else if (endTime == timestamp) {
+ leftWinAlreadyCreated = true;
+ putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+ } else if (endTime > timestamp && startTime <= timestamp) {
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+ } else {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+
+ //create right window for previous record
+ if (latestLeftTypeWindow != null) {
+ final long rightWinStart = latestLeftTypeWindow.end() + 1;
+ if (!windowStartTimes.contains(rightWinStart)) {
+ final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+ }
+ }
+
+ //create left window for new record
+ if (!leftWinAlreadyCreated) {
+ final ValueAndTimestamp<Agg> valueAndTime;
+ //there's a right window that the new record could create --> new record's left window is not empty
+ if (latestLeftTypeWindow != null) {
+ valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
+ } else {
+ valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+ }
+ final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+ }
+ //create right window for new record
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+ final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+ putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+ }
+ }
+
+ private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
+ return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+ }
+
+ private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
+ return window.key.window().end() == window.value.timestamp();
+ }
+
+ private void putAndForward(final Window window,
+ final ValueAndTimestamp<Agg> valueAndTime,
+ final K key,
+ final V value,
+ final long closeTime,
+ final long timestamp) {
+ final long windowStart = window.start();
+ final long windowEnd = window.end();
+ if (windowEnd > closeTime) {
+ //get aggregate from existing window
+ final Agg oldAgg = getValueOrNull(valueAndTime);
+ final Agg newAgg;
+ // keep old aggregate if adding a right window, else add new record's value
+ if (windowStart == timestamp + 1) {
+ newAgg = oldAgg;
+ } else {
+ newAgg = aggregator.apply(key, value, oldAgg);
+ }
+ final long newTimestamp = oldAgg == null ? timestamp : Math.max(timestamp, valueAndTime.timestamp());
+ windowStore.put(
+ key,
+ ValueAndTimestamp.make(newAgg, newTimestamp),
+ windowStart);
+ tupleForwarder.maybeForward(
+ new Windowed<K>(key, window),
+ newAgg,
+ sendOldValues ? oldAgg : null,
+ newTimestamp);
+ } else {
+ log.warn(
+ "Skipping record for expired window. " +
+ "key=[{}] " +
+ "topic=[{}] " +
+ "partition=[{}] " +
+ "offset=[{}] " +
+ "timestamp=[{}] " +
+ "window=[{},{}] " +
+ "expiration=[{}] " +
+ "streamTime=[{}]",
+ key,
+ context().topic(),
+ context().partition(),
+ context().offset(),
+ context().timestamp(),
+ windowStart, windowEnd,
+ closeTime,
+ observedStreamTime
+ );
+ lateRecordDropSensor.record();
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
+ return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
+
+ public KTableValueGetter<Windowed<K>, Agg> get() {
+ return new KStreamWindowAggregateValueGetter();
+ }
+
+ @Override
+ public String[] storeNames() {
+ return new String[] {storeName};
+ }
+ };
+ }
+
+ private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
+ private TimestampedWindowStore<K, Agg> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) {
+ final K key = windowedKey.key();
+ return windowStore.fetch(key, windowedKey.window().start());
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
index 20a1b08..cc50131 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
@@ -88,22 +88,22 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");
final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(
- materialized,
- builder,
- CogroupedKStreamImpl.AGGREGATE_NAME);
+ materialized,
+ builder,
+ CogroupedKStreamImpl.AGGREGATE_NAME);
return aggregateBuilder.build(
- groupPatterns,
- initializer,
- new NamedInternal(named),
- materialize(materializedInternal),
- materializedInternal.keySerde() != null ?
- new WindowedSerdes.SessionWindowedSerde<>(
- materializedInternal.keySerde()) :
- null,
- materializedInternal.valueSerde(),
- materializedInternal.queryableStoreName(),
- sessionWindows,
- sessionMerger);
+ groupPatterns,
+ initializer,
+ new NamedInternal(named),
+ materialize(materializedInternal),
+ materializedInternal.keySerde() != null ?
+ new WindowedSerdes.SessionWindowedSerde<>(
+ materializedInternal.keySerde()) :
+ null,
+ materializedInternal.valueSerde(),
+ materializedInternal.queryableStoreName(),
+ sessionWindows,
+ sessionMerger);
}
@@ -118,12 +118,12 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
if ((sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the session store "
- + materialized.storeName()
- + " must be no smaller than the session inactivity gap plus the"
- + " grace period."
- + " Got gap=[" + sessionWindows.inactivityGap() + "],"
- + " grace=[" + sessionWindows.gracePeriodMs() + "],"
- + " retention=[" + retentionPeriod + "]");
+ + materialized.storeName()
+ + " must be no smaller than the session inactivity gap plus the"
+ + " grace period."
+ + " Got gap=[" + sessionWindows.inactivityGap() + "],"
+ + " grace=[" + sessionWindows.gracePeriodMs() + "],"
+ + " retention=[" + retentionPeriod + "]");
}
supplier = Stores.persistentSessionStore(
materialized.storeName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
similarity index 50%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
index f6169bd..54ed885 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
@@ -23,37 +23,32 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
-import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
-
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V>
- implements TimeWindowedCogroupedKStream<K, V> {
-
- private final Windows<W> windows;
+public class SlidingWindowedCogroupedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedCogroupedKStream<K, V> {
+ private final SlidingWindows windows;
private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns;
- TimeWindowedCogroupedKStreamImpl(final Windows<W> windows,
- final InternalStreamsBuilder builder,
- final Set<String> subTopologySourceNodes,
- final String name,
- final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
- final StreamsGraphNode streamsGraphNode,
- final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns) {
+ SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+ final InternalStreamsBuilder builder,
+ final Set<String> subTopologySourceNodes,
+ final String name,
+ final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+ final StreamsGraphNode streamsGraphNode,
+ final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns) {
super(name, null, null, subTopologySourceNodes, streamsGraphNode, builder);
//keySerde and valueSerde are null because there are many different groupStreams that they could be from
this.windows = windows;
@@ -61,7 +56,6 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
this.groupPatterns = groupPatterns;
}
-
@Override
public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer) {
return aggregate(initializer, Materialized.with(null, null));
@@ -91,71 +85,41 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
return aggregateBuilder.build(
- groupPatterns,
- initializer,
- new NamedInternal(named),
- materialize(materializedInternal),
- materializedInternal.keySerde() != null ?
- new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
- : null,
- materializedInternal.valueSerde(),
- materializedInternal.queryableStoreName(),
- windows);
+ groupPatterns,
+ initializer,
+ new NamedInternal(named),
+ materialize(materializedInternal),
+ materializedInternal.keySerde() != null ?
+ new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs())
+ : null,
+ materializedInternal.valueSerde(),
+ materializedInternal.queryableStoreName(),
+ windows);
}
- @SuppressWarnings("deprecation")
- // continuing to support Windows#maintainMs/segmentInterval in fallback mode
- private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
- final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
+ private StoreBuilder<TimestampedWindowStore<K, V>> materialize(final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
- if (materialized.retention() != null) {
- // new style retention: use Materialized retention and default segmentInterval
- final long retentionPeriod = materialized.retention().toMillis();
-
- if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
- throw new IllegalArgumentException("The retention period of the window store "
- + name
- + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + retentionPeriod
- + "]");
- }
-
- supplier = Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
- );
-
- } else {
- // old style retention: use deprecated Windows retention/segmentInterval.
-
- // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
- // to be (windows.size() + windows.grace()). This will yield the same default behavior.
+ final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+
+ if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the window store "
+ + name
+ + " must be no smaller than 2 * time difference plus the grace period."
+ + " Got time difference=[" + windows.timeDifferenceMs() + "],"
+ + " grace=[" + windows.gracePeriodMs()
+ + "],"
+ + " retention=[" + retentionPeriod
+ + "]");
+ }
- if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
- throw new IllegalArgumentException("The retention period of the window store "
- + name
- + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + windows.maintainMs()
- + "]");
- }
+ supplier = Stores.persistentTimestampedWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.timeDifferenceMs()),
+ false
+ );
- supplier = new RocksDbWindowBytesStoreSupplier(
- materialized.storeName(),
- windows.maintainMs(),
- Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
- windows.size(),
- false,
- true);
- }
}
final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
.timestampedWindowStoreBuilder(
@@ -169,10 +133,12 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
} else {
builder.withLoggingDisabled();
}
-
if (materialized.cachingEnabled()) {
builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
}
return builder;
}
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index aaf8fbc..0fc8da1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -25,38 +25,33 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
-import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
-
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
-
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
-
- private final Windows<W> windows;
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+ private final SlidingWindows windows;
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
- TimeWindowedKStreamImpl(final Windows<W> windows,
- final InternalStreamsBuilder builder,
- final Set<String> subTopologySourceNodes,
- final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
- final StreamsGraphNode streamsGraphNode) {
+ SlidingWindowedKStreamImpl(final SlidingWindows windows,
+ final InternalStreamsBuilder builder,
+ final Set<String> subTopologySourceNodes,
+ final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+ final StreamsGraphNode streamsGraphNode) {
super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
this.windows = Objects.requireNonNull(windows, "windows can't be null");
this.aggregateBuilder = aggregateBuilder;
@@ -72,7 +67,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
return doCount(named, Materialized.with(keySerde, Serdes.Long()));
}
-
@Override
public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
return count(NamedInternal.empty(), materialized);
@@ -81,20 +75,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
@Override
public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
-
- // TODO: remove this when we do a topology-incompatible release
- // we used to burn a topology name here, so we have to keep doing it for compatibility
- if (new MaterializedInternal<>(materialized).storeName() == null) {
- builder.newStoreName(AGGREGATE_NAME);
- }
-
return doCount(named, materialized);
}
private KTable<Windowed<K>, Long> doCount(final Named named,
final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+ new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
@@ -104,13 +91,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -126,7 +114,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
}
-
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
@@ -143,19 +130,19 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+ new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
-
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -183,7 +170,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+ new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
@@ -193,65 +180,41 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(reduceName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(reduceName),
+ materialize(materializedInternal),
+ new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+ materializedInternal.valueSerde());
}
- @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
- if (materialized.retention() != null) {
- // new style retention: use Materialized retention and default segmentInterval
- final long retentionPeriod = materialized.retention().toMillis();
-
- if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
- throw new IllegalArgumentException("The retention period of the window store "
- + name + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs() + "],"
- + " retention=[" + retentionPeriod + "]");
- }
-
- supplier = Stores.persistentTimestampedWindowStore(
+ final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+
+ // large retention time to ensure that all existing windows needed to create new sliding windows can be accessed
+ // earliest window start time we could need to create corresponding right window would be recordTime - 2 * timeDifference
+ if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the window store "
+ + name + " must be no smaller than 2 * time difference plus the grace period."
+ + " Got time difference=[" + windows.timeDifferenceMs() + "],"
+ + " grace=[" + windows.gracePeriodMs() + "],"
+ + " retention=[" + retentionPeriod + "]");
+ }
+ supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
+ Duration.ofMillis(windows.timeDifferenceMs()),
false
- );
-
- } else {
- // old style retention: use deprecated Windows retention/segmentInterval.
-
- // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
- // to be (windows.size() + windows.grace()). This will yield the same default behavior.
-
- if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
- throw new IllegalArgumentException("The retention period of the window store "
- + name + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs() + "],"
- + " retention=[" + windows.maintainMs() + "]");
- }
-
- supplier = new RocksDbWindowBytesStoreSupplier(
- materialized.storeName(),
- windows.maintainMs(),
- Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
- windows.size(),
- false,
- true);
- }
+ );
}
final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
);
if (materialized.loggingEnabled()) {
@@ -259,9 +222,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
} else {
builder.withLoggingDisabled();
}
-
if (materialized.cachingEnabled()) {
builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
}
return builder;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
index f6169bd..299c2d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
@@ -41,7 +41,7 @@ import java.util.Objects;
import java.util.Set;
public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V>
- implements TimeWindowedCogroupedKStream<K, V> {
+ implements TimeWindowedCogroupedKStream<K, V> {
private final Windows<W> windows;
private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
@@ -91,22 +91,22 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
return aggregateBuilder.build(
- groupPatterns,
- initializer,
- new NamedInternal(named),
- materialize(materializedInternal),
- materializedInternal.keySerde() != null ?
- new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
- : null,
- materializedInternal.valueSerde(),
- materializedInternal.queryableStoreName(),
- windows);
+ groupPatterns,
+ initializer,
+ new NamedInternal(named),
+ materialize(materializedInternal),
+ materializedInternal.keySerde() != null ?
+ new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
+ : null,
+ materializedInternal.valueSerde(),
+ materializedInternal.queryableStoreName(),
+ windows);
}
@SuppressWarnings("deprecation")
// continuing to support Windows#maintainMs/segmentInterval in fallback mode
private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
- final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
+ final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
if (materialized.retention() != null) {
@@ -115,13 +115,13 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
- + name
- + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + retentionPeriod
- + "]");
+ + name
+ + " must be no smaller than its window size plus the grace period."
+ + " Got size=[" + windows.size() + "],"
+ + " grace=[" + windows.gracePeriodMs()
+ + "],"
+ + " retention=[" + retentionPeriod
+ + "]");
}
supplier = Stores.persistentTimestampedWindowStore(
@@ -139,13 +139,13 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
throw new IllegalArgumentException("The retention period of the window store "
- + name
- + " must be no smaller than its window size plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + windows.maintainMs()
- + "]");
+ + name
+ + " must be no smaller than its window size plus the grace period."
+ + " Got size=[" + windows.size() + "],"
+ + " grace=[" + windows.gracePeriodMs()
+ + "],"
+ + " retention=[" + windows.maintainMs()
+ + "]");
}
supplier = new RocksDbWindowBytesStoreSupplier(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index aaf8fbc..3e6b03e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -104,13 +104,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
+
+
}
@Override
@@ -149,13 +152,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(aggregateName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(aggregateName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
+
+
}
@Override
@@ -193,13 +199,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
return aggregateBuilder.build(
- new NamedInternal(reduceName),
- materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
- materializedInternal.valueSerde());
+ new NamedInternal(reduceName),
+ materialize(materializedInternal),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+ materializedInternal.queryableStoreName(),
+ materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+ materializedInternal.valueSerde());
+
+
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 2fb28dd..e9a4796 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -18,8 +18,10 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -79,6 +81,10 @@ public final class GraphGraceSearchUtil {
final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
final SessionWindows windows = kStreamSessionWindowAggregate.windows();
return windows.gracePeriodMs() + windows.inactivityGap();
+ } else if (processorSupplier instanceof KStreamSlidingWindowAggregate) {
+ final KStreamSlidingWindowAggregate kStreamSlidingWindowAggregate = (KStreamSlidingWindowAggregate) processorSupplier;
+ final SlidingWindows windows = kStreamSlidingWindowAggregate.windows();
+ return windows.gracePeriodMs();
} else {
return null;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 7de4f62..5cd08f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
@@ -87,6 +88,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
+import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -460,6 +462,208 @@ public class KStreamAggregationIntegrationTest {
}
@Test
+ public void shouldReduceSlidingWindows() throws Exception {
+ final long firstBatchTimestamp = mockTime.milliseconds();
+ final long timeDifference = 500L;
+ produceMessages(firstBatchTimestamp);
+ final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2;
+ produceMessages(secondBatchTimestamp);
+ final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
+ produceMessages(thirdBatchTimestamp);
+
+ final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+ groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L)))
+ .reduce(reducer)
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
+
+ startStreams();
+
+ final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput = receiveMessages(
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
+ new StringDeserializer(),
+ String.class,
+ 25);
+
+ final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new StringDeserializer(),
+ String.class,
+ 25,
+ true);
+
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+ .thenComparing(KeyValueTimestamp::value);
+
+ windowedOutput.sort(comparator);
+ final long firstBatchLeftWindowStart = firstBatchTimestamp - timeDifference;
+ final long firstBatchRightWindowStart = firstBatchTimestamp + 1;
+
+ final long secondBatchLeftWindowStart = secondBatchTimestamp - timeDifference;
+ final long secondBatchRightWindowStart = secondBatchTimestamp + 1;
+
+ final long thirdBatchLeftWindowStart = thirdBatchTimestamp - timeDifference;
+
+
+ final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
+ // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "A", firstBatchTimestamp),
+ // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A", secondBatchTimestamp),
+ // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "A", thirdBatchTimestamp),
+ // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "A:A", secondBatchTimestamp),
+ // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A:A", thirdBatchTimestamp),
+ // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "A:A:A", thirdBatchTimestamp),
+
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "B", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "B", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "B:B", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B:B", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "B:B:B", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "C", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "C", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "C:C", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C:C", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "C:C:C", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "D", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "D", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "D:D", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D:D", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "D:D:D", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "E", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "E", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "E:E", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E:E", thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "E:E:E", thirdBatchTimestamp)
+ );
+ assertThat(windowedOutput, is(expectResult));
+
+ final Set<String> expectResultString = new HashSet<>(expectResult.size());
+ for (final KeyValueTimestamp<Windowed<String>, String> eachRecord: expectResult) {
+ expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", "
+ + eachRecord.key() + ", " + eachRecord.value());
+ }
+
+ // check every message is contained in the expect result
+ final String[] allRecords = resultFromConsoleConsumer.split("\n");
+ for (final String record: allRecords) {
+ assertTrue(expectResultString.contains(record));
+ }
+ }
+
+ @Test
+ public void shouldAggregateSlidingWindows() throws Exception {
+ final long firstBatchTimestamp = mockTime.milliseconds();
+ final long timeDifference = 500L;
+ produceMessages(firstBatchTimestamp);
+ final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2;
+ produceMessages(secondBatchTimestamp);
+ final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
+ produceMessages(thirdBatchTimestamp);
+
+ final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+ groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5)))
+ .aggregate(
+ initializer,
+ aggregator,
+ Materialized.with(null, Serdes.Integer())
+ )
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
+
+ startStreams();
+
+ final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(),
+ new IntegerDeserializer(),
+ String.class,
+ 15);
+
+ // read from ConsoleConsumer
+ final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new IntegerDeserializer(),
+ String.class,
+ 15,
+ true);
+
+ final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> o) -> o.key().key())
+ .thenComparingInt(KeyValueTimestamp::value);
+ windowedMessages.sort(comparator);
+
+ final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+ final long firstBatchRightWindow = firstBatchTimestamp + 1;
+ final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference;
+ final long secondBatchRightWindow = secondBatchTimestamp + 1;
+ final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+ final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
+ // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+ // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+ // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+ // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+ // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+ // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp)
+ );
+
+ assertThat(windowedMessages, is(expectResult));
+
+ final Set<String> expectResultString = new HashSet<>(expectResult.size());
+ for (final KeyValueTimestamp<Windowed<String>, Integer> eachRecord: expectResult) {
+ expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
+ }
+
+ // check every message is contained in the expect result
+ final String[] allRecords = resultFromConsoleConsumer.split("\n");
+ for (final String record: allRecords) {
+ assertTrue(expectResultString.contains(record));
+ }
+
+ }
+
+ @Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
new file mode 100644
index 0000000..f6c63a3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
+import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowsTest {
+
+ private static final long ANY_SIZE = 123L;
+
+ @Test
+ public void shouldSetTimeDifference() {
+ assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs());
+ }
+
+ @Test
+ public void timeDifferenceMustNotBeNegative() {
+ assertThrows(IllegalArgumentException.class, () -> SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5)));
+ }
+
+ @Test
+ public void shouldSetGracePeriod() {
+ assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(ANY_SIZE)).gracePeriodMs());
+ }
+
+ @Test
+ public void gracePeriodMustNotBeNegative() {
+ assertThrows(IllegalArgumentException.class, () -> SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1)));
+ }
+
+ @Test
+ public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+ final long grace = 1L + (long) (Math.random() * (20L - 1L));
+ final long timeDifference = 1L + (long) (Math.random() * (20L - 1L));
+ verifyEquality(
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)),
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace))
+ );
+ }
+
+ @Test
+ public void equalsAndHashcodeShouldNotBeEqualForDifferentTimeDifference() {
+ final long grace = 1L + (long) (Math.random() * (10L - 1L));
+ final long timeDifferenceOne = 1L + (long) (Math.random() * (10L - 1L));
+ final long timeDifferenceTwo = 21L + (long) (Math.random() * (41L - 21L));
+ verifyInEquality(
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifferenceOne), ofMillis(grace)),
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifferenceTwo), ofMillis(grace))
+ );
+ }
+
+ @Test
+ public void equalsAndHashcodeShouldNotBeEqualForDifferentGracePeriod() {
+ final long timeDifference = 1L + (long) (Math.random() * (10L - 1L));
+ final long graceOne = 1L + (long) (Math.random() * (10L - 1L));
+ final long graceTwo = 21L + (long) (Math.random() * (41L - 21L));
+ verifyInEquality(
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(graceOne)),
+ SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(graceTwo))
+ );
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index d780fa7..eb73b77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -85,64 +87,69 @@ public class CogroupedKStreamImplTest {
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
- cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullAggregatorOnCogroup() {
- cogroupedStream.cogroup(groupedStream, null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.cogroup(groupedStream, null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnAggregate() {
- cogroupedStream.aggregate(null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
- cogroupedStream.aggregate(null, Named.as("name"));
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Named.as("name")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
- cogroupedStream.aggregate(null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
- cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullNamedOnAggregate() {
- cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullMaterializedOnAggregate() {
- cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
- cogroupedStream.aggregate(STRING_INITIALIZER, null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, null, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
- cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullWindowOnWindowedByTime() {
- cogroupedStream.windowedBy((Windows<? extends Window>) null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((Windows<? extends Window>) null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullWindowOnWindowedBySession() {
- cogroupedStream.windowedBy((SessionWindows) null);
+ assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((SessionWindows) null));
+ }
+
+ @Test
+ public void shouldNotHaveNullWindowOnWindowedBySliding() {
+ assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((SlidingWindows) null));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 34de30c..a23b98d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
@@ -61,6 +62,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
public class KGroupedStreamImplTest {
@@ -77,82 +80,212 @@ public class KGroupedStreamImplTest {
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullAggregatorOnCogroup() {
- groupedStream.cogroup(null);
+ assertThrows(NullPointerException.class, () -> groupedStream.cogroup(null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullReducerOnReduce() {
- groupedStream.reduce(null);
+ assertThrows(NullPointerException.class, () -> groupedStream.reduce(null));
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotHaveInvalidStoreNameOnReduce() {
- groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+ assertThrows(TopologyException.class, () -> groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullReducerWithWindowedReduce() {
- groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
- .reduce(null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .reduce(null, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullWindowsWithWindowedReduce() {
- groupedStream.windowedBy((Windows<?>) null);
+ assertThrows(NullPointerException.class, () -> groupedStream.windowedBy((Windows<?>) null));
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
- groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
- .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnAggregate() {
- groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullAdderOnAggregate() {
- groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotHaveInvalidStoreNameOnAggregate() {
- groupedStream.aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- Materialized.as(INVALID_STORE_NAME));
+ assertThrows(TopologyException.class, () -> groupedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(INVALID_STORE_NAME)));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullInitializerOnWindowedAggregate() {
- groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
- .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullAdderOnWindowedAggregate() {
- groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
- .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotHaveNullWindowsOnWindowedAggregate() {
- groupedStream.windowedBy((Windows<?>) null);
+ assertThrows(NullPointerException.class, () -> groupedStream.windowedBy((Windows<?>) null));
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+ }
+
+ @Test
+ public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .reduce(null, Materialized.as("store")));
+ }
+
+ @Test
+ public void shouldNotHaveNullWindowsWithSlidingWindowedReduce() {
+ assertThrows(NullPointerException.class, () -> groupedStream.windowedBy((SlidingWindows) null));
+ }
+
+ @Test
+ public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+ }
+
+ @Test
+ public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
+ }
+
+ @Test
+ public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
+ }
+
+ @Test
+ public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+ }
+
+ @Test
+ public void shouldCountSlidingWindows() {
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .count(Materialized.as("aggregate-by-key-windowed"))
+ .toStream()
+ .process(supplier);
+
+ doCountSlidingWindows(supplier);
+ }
+
+ @Test
+ public void shouldCountSlidingWindowsWithInternalStoreName() {
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ groupedStream
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .count()
+ .toStream()
+ .process(supplier);
+
+ doCountSlidingWindows(supplier);
+ }
+
+ private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("1", "A", 500L);
+ inputTopic.pipeInput("1", "A", 999L);
+ inputTopic.pipeInput("1", "A", 600L);
+ inputTopic.pipeInput("2", "B", 500L);
+ inputTopic.pipeInput("2", "B", 600L);
+ inputTopic.pipeInput("2", "B", 700L);
+ inputTopic.pipeInput("3", "C", 501L);
+ inputTopic.pipeInput("1", "A", 1000L);
+ inputTopic.pipeInput("1", "A", 1000L);
+ inputTopic.pipeInput("2", "B", 1000L);
+ inputTopic.pipeInput("2", "B", 1000L);
+ inputTopic.pipeInput("3", "C", 600L);
+ }
+ assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
+ // processing A@500
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
+ // processing A@999
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
+ // processing A@600
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
+ // processing B@500
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
+ // processing B@600
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
+ // processing B@700
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
+ // processing C@501
+ new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
+ // processing first A@1000
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
+ // processing second A@1000
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
+ // processing first B@1000
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
+ // processing second B@1000
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
+ // processing C@600
+ new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
+ new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
+
+ )));
}
private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
@@ -305,70 +438,61 @@ public class KGroupedStreamImplTest {
assertNull(table.queryableStoreName());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
- .reduce(null, Materialized.as("store"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(null, Materialized.as("store")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
- groupedStream.windowedBy((SessionWindows) null);
+ assertThrows(NullPointerException.class, () -> groupedStream.windowedBy((SessionWindows) null));
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
- .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
- .reduce(
- null,
- Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
- .aggregate(
- null,
- MockAggregator.TOSTRING_ADDER,
- (aggKey, aggOne, aggTwo) -> null,
- Materialized.as("storeName"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
- groupedStream.
- windowedBy(SessionWindows.with(ofMillis(30)))
- .aggregate(
- MockInitializer.STRING_INIT,
- null,
- (aggKey, aggOne, aggTwo) -> null,
- Materialized.as("storeName"));
+ assertThrows(NullPointerException.class, () -> groupedStream.
+ windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
- .aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- null,
- Materialized.as("storeName"));
+ assertThrows(NullPointerException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as("storeName"))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
- groupedStream.windowedBy((SessionWindows) null);
+ assertThrows(NullPointerException.class, () -> groupedStream.windowedBy((SessionWindows) null));
}
@Test
@@ -376,36 +500,33 @@ public class KGroupedStreamImplTest {
groupedStream
.windowedBy(SessionWindows.with(ofMillis(10)))
.aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- (aggKey, aggOne, aggTwo) -> null,
- Materialized.with(Serdes.String(), Serdes.String()));
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ (aggKey, aggOne, aggTwo) -> null, Materialized.with(Serdes.String(), Serdes.String())
+ );
}
- @Test(expected = TopologyException.class)
+ @Test
public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
- groupedStream
- .windowedBy(SessionWindows.with(ofMillis(10)))
- .aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- (aggKey, aggOne, aggTwo) -> null,
- Materialized.as(INVALID_STORE_NAME));
+ assertThrows(TopologyException.class, () -> groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(10)))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as(INVALID_STORE_NAME))
+ );
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
- groupedStream.reduce(MockReducer.STRING_ADDER, null);
+ assertThrows(NullPointerException.class, () -> groupedStream.reduce(MockReducer.STRING_ADDER, null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
- groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
+ assertThrows(NullPointerException.class, () -> groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
- groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null);
+ assertThrows(NullPointerException.class, () -> groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
new file mode 100644
index 0000000..6bcb52c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private final String threadId = Thread.currentThread().getName();
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAggregateSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table.toStream().process(supplier);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("A", "1", 10L);
+ inputTopic.pipeInput("A", "2", 15L);
+ inputTopic.pipeInput("A", "3", 20L);
+ inputTopic.pipeInput("A", "4", 22L);
+ inputTopic.pipeInput("A", "5", 30L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("0+1", 10L));
+ expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L));
+ expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L));
+ expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L));
+ expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L));
+ expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L));
+ expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L));
+ expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L));
+ expected.put(23L, ValueAndTimestamp.make("0+5", 30L));
+
+ assertEquals(expected, actual);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReduceSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .reduce(
+ MockReducer.STRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table.toStream().process(supplier);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("A", "1", 10L);
+ inputTopic.pipeInput("A", "2", 14L);
+ inputTopic.pipeInput("A", "3", 15L);
+ inputTopic.pipeInput("A", "4", 22L);
+ inputTopic.pipeInput("A", "5", 26L);
+ inputTopic.pipeInput("A", "6", 30L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("1", 10L));
+ expected.put(4L, ValueAndTimestamp.make("1+2", 14L));
+ expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L));
+ expected.put(11L, ValueAndTimestamp.make("2+3", 15L));
+ expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L));
+ expected.put(15L, ValueAndTimestamp.make("3+4", 22L));
+ expected.put(16L, ValueAndTimestamp.make("4+5", 26L));
+ expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L));
+ expected.put(23L, ValueAndTimestamp.make("5+6", 30L));
+ expected.put(27L, ValueAndTimestamp.make("6", 30L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testAggregateLargeInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+ inputTopic1.pipeInput("A", "1", 10L);
+ inputTopic1.pipeInput("A", "2", 20L);
+ inputTopic1.pipeInput("A", "3", 22L);
+ inputTopic1.pipeInput("A", "4", 15L);
+
+ inputTopic1.pipeInput("B", "1", 12L);
+ inputTopic1.pipeInput("B", "2", 13L);
+ inputTopic1.pipeInput("B", "3", 18L);
+ inputTopic1.pipeInput("B", "4", 19L);
+ inputTopic1.pipeInput("B", "5", 25L);
+ inputTopic1.pipeInput("B", "6", 14L);
+
+ inputTopic1.pipeInput("C", "1", 11L);
+ inputTopic1.pipeInput("C", "2", 15L);
+ inputTopic1.pipeInput("C", "3", 16L);
+ inputTopic1.pipeInput("C", "4", 21);
+ inputTopic1.pipeInput("C", "5", 23L);
+
+ inputTopic1.pipeInput("D", "4", 11L);
+ inputTopic1.pipeInput("D", "2", 12L);
+ inputTopic1.pipeInput("D", "3", 29L);
+ inputTopic1.pipeInput("D", "5", 16L);
+ }
+
+ assertEquals(
+ asList(
+ // FINAL WINDOW: A@10 left window created when A@10 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
+ // A@10 right window created when A@20 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
+ // A@20 left window created when A@20 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
+ // FINAL WINDOW: A@20 right window created when A@22 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
+ // A@22 left window created when A@22 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
+ // FINAL WINDOW: A@20 left window updated when A@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
+ // FINAL WINDOW: A@10 right window updated when A@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
+ // FINAL WINDOW: A@22 left window updated when A@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
+ // FINAL WINDOW: A@15 left window created when A@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15),
+ // FINAL WINDOW: A@15 right window created when A@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22),
+
+ // FINAL WINDOW: B@12 left window created when B@12 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12),
+ // B@12 right window created when B@13 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
+ // FINAL WINDOW: B@13 left window created when B@13 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13),
+ // B@12 right window updated when B@18 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18),
+ // B@13 right window created when B@18 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
+ // B@18 left window created when B@18 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18),
+ // B@12 right window updated when B@19 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19),
+ // B@13 right window updated when B@19 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
+ // B@18 right window created when B@19 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
+ // B@19 left window created when B@19 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19),
+ // FINAL WINDOW: B@18 right window updated when B@25 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25),
+ // FINAL WINDOW: B@19 right window updated when B@25 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25),
+ // FINAL WINDOW: B@25 left window created when B@25 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
+ // FINAL WINDOW: B@18 left window updated when B@14 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
+ // FINAL WINDOW: B@19 left window updated when B@14 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
+ // FINAL WINDOW: B@12 right window updated when B@14 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
+ // FINAL WINDOW: B@13 right window updated when B@14 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
+ // FINAL WINDOW: B@14 left window created when B@14 processed
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
+
+ // FINAL WINDOW: C@11 left window created when C@11 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11),
+ // C@11 right window created when C@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
+ // FINAL WINDOW: C@15 left window created when C@15 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15),
+ // C@11 right window updated when C@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
+ // C@15 right window created when C@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
+ // FINAL WINDOW: C@16 left window created when C@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16),
+ // FINAL WINDOW: C@11 right window updated when C@21 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
+ // C@15 right window updated when C@21 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
+ // C@16 right window created when C@21 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
+ // FINAL WINDOW: C@21 left window created when C@21 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21),
+ // FINAL WINDOW: C@15 right window updated when C@23 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23),
+ // FINAL WINDOW: C@16 right window updated when C@23 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23),
+ // FINAL WINDOW: C@21 right window created when C@23 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23),
+ // FINAL WINDOW: C@23 left window created when C@23 processed
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
+
+ // FINAL WINDOW: D@11 left window created when D@11 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11),
+ // D@11 right window created when D@12 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
+ // FINAL WINDOW: D@12 left window created when D@12 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12),
+ // FINAL WINDOW: D@29 left window created when D@29 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29),
+ // FINAL WINDOW: D@11 right window updated when D@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16),
+ // FINAL WINDOW: D@12 right window created when D@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16),
+ // FINAL WINDOW: D@16 left window created when D@16 processed
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16)
+ ),
+ supplier.theCapturedProcessor().processed()
+ );
+ }
+
+ @Test
+ public void testJoin() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+
+ final KTable<Windowed<String>, String> table1 = builder
+ .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+ );
+
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table1.toStream().process(supplier);
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())
+ );
+ table2.toStream().process(supplier);
+
+ table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> inputTopic2 =
+ driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
+ inputTopic1.pipeInput("A", "1", 10L);
+ inputTopic1.pipeInput("B", "2", 11L);
+ inputTopic1.pipeInput("C", "3", 12L);
+
+ final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+ processors.get(0).checkAndClearProcessResult(
+ // left windows created by the first set of records to table 1
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(1, 11)), "0+2", 11),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3", 12)
+ );
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+
+ inputTopic1.pipeInput("A", "1", 15L);
+ inputTopic1.pipeInput("B", "2", 16L);
+ inputTopic1.pipeInput("C", "3", 19L);
+
+ processors.get(0).checkAndClearProcessResult(
+ // right windows from previous records are created, and left windows from new records to table 1
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1", 15),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 15),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(12, 22)), "0+2", 16),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2", 16),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3", 19),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(9, 19)), "0+3+3", 19)
+ );
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+
+ inputTopic2.pipeInput("A", "a", 10L);
+ inputTopic2.pipeInput("B", "b", 30L);
+ inputTopic2.pipeInput("C", "c", 12L);
+ inputTopic2.pipeInput("C", "c", 35L);
+
+
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
+ // left windows from first set of records sent to table 2
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+b", 30),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+c", 12),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25, 35)), "0+c", 35)
+ );
+ processors.get(2).checkAndClearProcessResult(
+ // set of join windows from windows created by table 1 and table 2
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1%0+a", 10),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3%0+c", 12)
+ );
+
+ inputTopic2.pipeInput("A", "a", 15L);
+ inputTopic2.pipeInput("B", "b", 16L);
+ inputTopic2.pipeInput("C", "c", 17L);
+
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
+ // right windows from previous records are created (where applicable), and left windows from new records to table 2
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+a", 15),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a+a", 15),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+b", 16),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+c", 17),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(7, 17)), "0+c+c", 17)
+ );
+ processors.get(2).checkAndClearProcessResult(
+ // set of join windows from windows created by table 1 and table 2
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1%0+a", 15),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a+a", 15),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2%0+b", 16),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3%0+c", 19)
+ );
+ }
+ }
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullKey() {
+ final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+ builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
+
+ props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+ final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput(null, "1");
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. value=[1] topic=[topic] partition=[0] offset=[0]"));
+ }
+ }
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+ final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+ stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90L)))
+ .aggregate(
+ () -> "",
+ MockAggregator.toStringInstance("+"),
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+ )
+ .toStream()
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to("output");
+
+ props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+ final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("k", "100", 200L);
+ inputTopic.pipeInput("k", "0", 100L);
+ inputTopic.pipeInput("k", "1", 101L);
+ inputTopic.pipeInput("k", "2", 102L);
+ inputTopic.pipeInput("k", "3", 103L);
+ inputTopic.pipeInput("k", "4", 104L);
+ inputTopic.pipeInput("k", "5", 105L);
+ inputTopic.pipeInput("k", "6", 15L);
+
+ assertLatenessMetrics(driver, is(7.0), is(185.0), is(96.25));
+
+ assertThat(appender.getMessages(), hasItems(
+ // left window for k@100
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]",
+ // left window for k@101
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]",
+ // left window for k@102
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]",
+ // left window for k@103
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]",
+ // left window for k@104
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]",
+ // left window for k@105
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]",
+ // left window for k@15
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"
+ ));
+ final TestOutputTopic<String, String> outputTopic =
+ driver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+ assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>("[k@190/200]", "+100", null, 200L)));
+ assertTrue(outputTopic.isEmpty());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAggregateRandomInput() {
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ final KTable<Windowed<String>, String> table = builder
+ .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000)))
+ // The aggregator needs to sort the strings so the window value is the same for the final windows even when
+ // records are processed in a different order. Here, we sort alphabetically.
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> {
+ aggregate += value;
+ final char[] ch = aggregate.toCharArray();
+ Arrays.sort(ch);
+ aggregate = String.valueOf(ch);
+ return aggregate;
+ },
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table.toStream().process(supplier);
+ final long seed = new Random().nextLong();
+ final Random shuffle = new Random(seed);
+
+ try {
+
+ final List<ValueAndTimestamp<String>> input = Arrays.asList(
+ ValueAndTimestamp.make("A", 10L),
+ ValueAndTimestamp.make("B", 15L),
+ ValueAndTimestamp.make("C", 16L),
+ ValueAndTimestamp.make("D", 18L),
+ ValueAndTimestamp.make("E", 30L),
+ ValueAndTimestamp.make("F", 40L),
+ ValueAndTimestamp.make("G", 55L),
+ ValueAndTimestamp.make("H", 56L),
+ ValueAndTimestamp.make("I", 58L),
+ ValueAndTimestamp.make("J", 58L),
+ ValueAndTimestamp.make("K", 62L),
+ ValueAndTimestamp.make("L", 63L),
+ ValueAndTimestamp.make("M", 63L),
+ ValueAndTimestamp.make("N", 63L),
+ ValueAndTimestamp.make("O", 76L),
+ ValueAndTimestamp.make("P", 77L),
+ ValueAndTimestamp.make("Q", 80L)
+ );
+
+ Collections.shuffle(input, shuffle);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+ for (int i = 0; i < input.size(); i++) {
+ inputTopic1.pipeInput("A", input.get(i).value(), input.get(i).timestamp());
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> results = new HashMap<>();
+
+ for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (results.putIfAbsent(start, valueAndTimestamp) != null) {
+ results.replace(start, valueAndTimestamp);
+ }
+ }
+ verifyRandomTestResults(results);
+ } catch (final AssertionError t) {
+ throw new AssertionError(
+ "Assertion failed in randomized test. Reproduce with seed: " + seed + ".",
+ t
+ );
+ } catch (final Throwable t) {
+ final StringBuilder sb =
+ new StringBuilder()
+ .append("Exception in randomized scenario. Reproduce with seed: ")
+ .append(seed)
+ .append(".");
+ throw new AssertionError(sb.toString(), t);
+ }
+ }
+
+ private void verifyRandomTestResults(final Map<Long, ValueAndTimestamp<String>> actual) {
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("A", 10L));
+ expected.put(5L, ValueAndTimestamp.make("AB", 15L));
+ expected.put(6L, ValueAndTimestamp.make("ABC", 16L));
+ expected.put(8L, ValueAndTimestamp.make("ABCD", 18L));
+ expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
+ expected.put(16L, ValueAndTimestamp.make("CD", 18L));
+ expected.put(17L, ValueAndTimestamp.make("D", 18L));
+ expected.put(20L, ValueAndTimestamp.make("E", 30L));
+ expected.put(30L, ValueAndTimestamp.make("EF", 40L));
+ expected.put(31L, ValueAndTimestamp.make("F", 40L));
+ expected.put(45L, ValueAndTimestamp.make("G", 55L));
+ expected.put(46L, ValueAndTimestamp.make("GH", 56L));
+ expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
+ expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
+ expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
+ expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
+ expected.put(57L, ValueAndTimestamp.make("IJKLMN", 63L));
+ expected.put(59L, ValueAndTimestamp.make("KLMN", 63L));
+ expected.put(63L, ValueAndTimestamp.make("LMN", 63L));
+ expected.put(66L, ValueAndTimestamp.make("O", 76L));
+ expected.put(67L, ValueAndTimestamp.make("OP", 77L));
+ expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
+ expected.put(77L, ValueAndTimestamp.make("PQ", 80L));
+ expected.put(78L, ValueAndTimestamp.make("Q", 80L));
+
+ assertEquals(expected, actual);
+
+ }
+
+ private void assertLatenessMetrics(final TopologyTestDriver driver,
+ final Matcher<Object> dropTotal,
+ final Matcher<Object> maxLateness,
+ final Matcher<Object> avgLateness) {
+
+ final MetricName dropTotalMetric;
+ final MetricName dropRateMetric;
+ final MetricName latenessMaxMetric;
+ final MetricName latenessAvgMetric;
+ dropTotalMetric = new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "The total number of dropped records",
+ mkMap(
+ mkEntry("thread-id", threadId),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ dropRateMetric = new MetricName(
+ "dropped-records-rate",
+ "stream-task-metrics",
+ "The average number of dropped records per second",
+ mkMap(
+ mkEntry("thread-id", threadId),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ latenessMaxMetric = new MetricName(
+ "record-lateness-max",
+ "stream-task-metrics",
+ "The observed maximum lateness of records in milliseconds, measured by comparing the record "
+ + "timestamp with the current stream time",
+ mkMap(
+ mkEntry("thread-id", threadId),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ latenessAvgMetric = new MetricName(
+ "record-lateness-avg",
+ "stream-task-metrics",
+ "The observed average lateness of records in milliseconds, measured by comparing the record "
+ + "timestamp with the current stream time",
+ mkMap(
+ mkEntry("thread-id", threadId),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ assertThat(driver.metrics().get(dropTotalMetric).metricValue(), dropTotal);
+ assertThat(driver.metrics().get(dropRateMetric).metricValue(), not(0.0));
+ assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
+ assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
new file mode 100644
index 0000000..41ac960
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import static java.time.Duration.ofMillis;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.Properties;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SlidingWindowedCogroupedKStreamImplTest {
+
+ private static final String TOPIC = "topic";
+ private static final String TOPIC2 = "topic2";
+ private static final String OUTPUT = "output";
+ private final StreamsBuilder builder = new StreamsBuilder();
+
+ private KGroupedStream<String, String> groupedStream;
+
+ private KGroupedStream<String, String> groupedStream2;
+ private CogroupedKStream<String, String> cogroupedStream;
+ private TimeWindowedCogroupedKStream<String, String> windowedCogroupedStream;
+
+ private final Properties props = StreamsTestUtils
+ .getStreamsConfig(Serdes.String(), Serdes.String());
+
+ @Before
+ public void setup() {
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed
+ .with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream2 = builder.stream(TOPIC2, Consumed
+ .with(Serdes.String(), Serdes.String()));
+
+ groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+ groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+ cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+ .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
+ windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)));
+ }
+
+ @Test
+ public void shouldNotHaveNullInitializerOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(null));
+ }
+
+ @Test
+ public void shouldNotHaveNullMaterializedOnTwoOptionAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Materialized<String, String, WindowStore<Bytes, byte[]>>) null));
+ }
+
+ @Test
+ public void shouldNotHaveNullNamedTwoOptionOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Named) null));
+ }
+
+ @Test
+ public void shouldNotHaveNullInitializerTwoOptionNamedOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(null, Named.as("test")));
+ }
+
+ @Test
+ public void shouldNotHaveNullInitializerTwoOptionMaterializedOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(null, Materialized.as("test")));
+ }
+
+ @Test
+ public void shouldNotHaveNullInitializerThreeOptionOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(null, Named.as("test"), Materialized.as("test")));
+ }
+
+ @Test
+ public void shouldNotHaveNullMaterializedOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, Named.as("Test"), null));
+ }
+
+ @Test
+ public void shouldNotHaveNullNamedOnAggregate() {
+ assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("test")));
+ }
+
+ @Test
+ public void namedParamShouldSetName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed
+ .with(Serdes.String(), Serdes.String()));
+ groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+ groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
+
+ assertThat(builder.build().describe().toString(), equalTo(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+ " --> foo-cogroup-agg-0\n" +
+ " Processor: foo-cogroup-agg-0 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+ " --> foo-cogroup-merge\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: foo-cogroup-merge (stores: [])\n" +
+ " --> none\n" +
+ " <-- foo-cogroup-agg-0\n\n"));
+ }
+
+ @Test
+ public void slidingWindowAggregateTestStreamsTest() {
+ final KTable<Windowed<String>, String> customers = windowedCogroupedStream.aggregate(
+ MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
+ TOPIC, new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
+ OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "A", 500);
+ testInputTopic.pipeInput("k2", "A", 500);
+ testInputTopic.pipeInput("k2", "A", 501);
+ testInputTopic.pipeInput("k1", "A", 502);
+ testInputTopic.pipeInput("k1", "B", 503);
+ testInputTopic.pipeInput("k2", "B", 503);
+ testInputTopic.pipeInput("k2", "B", 504);
+ testInputTopic.pipeInput("k1", "B", 504);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 501);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A", 501);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 502);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A", 502);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B", 503);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 504);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B+B", 504);
+ }
+ }
+
+ @Test
+ public void slidingWindowAggregateOverlappingWindowsTest() {
+
+ final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate(
+ MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
+ TOPIC, new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
+ OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "A", 500);
+ testInputTopic.pipeInput("k2", "A", 500);
+ testInputTopic.pipeInput("k1", "B", 750);
+ testInputTopic.pipeInput("k2", "B", 750);
+ testInputTopic.pipeInput("k2", "A", 1000L);
+ testInputTopic.pipeInput("k1", "A", 1000L);
+
+ // left window k1@500
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500);
+ // left window k2@500
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500);
+ // right window k1@500
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 750);
+ // left window k1@750
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 750);
+ // right window k2@500
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 750);
+ // left window k2@750
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 750);
+ // right window k2@500 update
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+A", 1000);
+ // right window k2@750
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 1000);
+ // left window k2@1000
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+A", 1000);
+ // right window k1@500 update
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+A", 1000);
+ // right window k1@750
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 1000);
+ // left window k1@1000
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+A", 1000);
+ }
+ }
+
+ private void assertOutputKeyValueTimestamp(final TestOutputTopic<Windowed<String>, String> outputTopic,
+ final String expectedKey,
+ final String expectedValue,
+ final long expectedTimestamp) {
+ final TestRecord<Windowed<String>, String> realRecord = outputTopic.readRecord();
+ final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
+ realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
+ final TestRecord<String, String> testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
+ assertThat(nonWindowedRecord, equalTo(testRecord));
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
new file mode 100644
index 0000000..d6b26bf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+ private static final String TOPIC = "input";
+ private final StreamsBuilder builder = new StreamsBuilder();
+ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private TimeWindowedKStream<String, String> windowedStream;
+
+ @Before
+ public void before() {
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+ windowedStream = stream.
+ groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
+ }
+
+ @Test
+ public void shouldCountSlidingWindows() {
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ windowedStream
+ .count()
+ .toStream()
+ .process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ }
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+ equalTo(ValueAndTimestamp.make(1L, 100L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+ equalTo(ValueAndTimestamp.make(1L, 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make(2L, 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+ equalTo(ValueAndTimestamp.make(1L, 500L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+ equalTo(ValueAndTimestamp.make(2L, 200L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make(1L, 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+ equalTo(ValueAndTimestamp.make(1L, 200L)));
+ }
+
+ @Test
+ public void shouldReduceSlidingWindows() {
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ windowedStream
+ .reduce(MockReducer.STRING_ADDER)
+ .toStream()
+ .process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ }
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+ equalTo(ValueAndTimestamp.make("1", 100L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+ equalTo(ValueAndTimestamp.make("2", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make("1+2", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+ equalTo(ValueAndTimestamp.make("3", 500L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+ equalTo(ValueAndTimestamp.make("10+20", 200L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make("20", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+ equalTo(ValueAndTimestamp.make("10", 200L)));
+ }
+
+ @Test
+ public void shouldAggregateSlidingWindows() {
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ windowedStream
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.with(Serdes.String(), Serdes.String()))
+ .toStream()
+ .process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ }
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+ equalTo(ValueAndTimestamp.make("0+1", 100L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+ equalTo(ValueAndTimestamp.make("0+2", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+ equalTo(ValueAndTimestamp.make("0+3", 500L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+ equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+ equalTo(ValueAndTimestamp.make("0+20", 150L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+ .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+ equalTo(ValueAndTimestamp.make("0+10", 200L)));
+ }
+
+ @Test
+ public void shouldMaterializeCount() {
+ windowedStream.count(
+ Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ {
+ final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), 1L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), 2L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), 1L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), 2L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), 1L))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+ driver.getTimestampedWindowStore("count-store");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make(1L, 100L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make(2L, 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make(1L, 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make(1L, 150L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make(2L, 200L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make(1L, 200L))))); }
+ }
+ }
+
+ @Test
+ public void shouldMaterializeReduced() {
+ windowedStream.reduce(
+ MockReducer.STRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ {
+ final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "1"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), "2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), "3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), "20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), "10+20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), "10"))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+ driver.getTimestampedWindowStore("reduced");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("1", 100L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("1+2", 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make("2", 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make("3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make("20", 150L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make("10+20", 200L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make("10", 200L)))));
+ }
+ }
+ }
+
+ @Test
+ public void shouldMaterializeAggregated() {
+ windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ {
+ final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "0+1"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), "0+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), "0+20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), "0+10+20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), "0+10"))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+ driver.getTimestampedWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("0+1", 100L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("0+1+2", 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make("0+2", 150L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make("0+3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make("0+20", 150L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make("0+10+20", 200L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make("0+10", 200L)))));
+ }
+ }
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.aggregate(MockInitializer.STRING_INIT, null));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.reduce(null));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ null,
+ Materialized.as("store")));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.reduce(null, Materialized.as("store")));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.reduce(MockReducer.STRING_ADDER, (Named) null));
+ }
+
+ @Test
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ assertThrows(NullPointerException.class, () -> windowedStream.count((Materialized<String, Long, WindowStore<Bytes, byte[]>>) null));
+ }
+
+ @Test
+ public void shouldThrowIllegalArgumentWhenRetentionIsTooSmall() {
+ assertThrows(IllegalArgumentException.class, () -> windowedStream
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized
+ .<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ .withRetention(ofMillis(1L))
+ )
+ );
+ }
+
+ @Test
+ public void shouldDropWindowsOutsideOfRetention() {
+ final WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("aggregated", ofMillis(1200L), ofMillis(100L), false);
+ windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String>as(storeSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ .withCachingDisabled());
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+
+ inputTopic.pipeInput("1", "2", 100L);
+ inputTopic.pipeInput("1", "3", 500L);
+ inputTopic.pipeInput("1", "4", 799L);
+ inputTopic.pipeInput("1", "4", 1000L);
+ inputTopic.pipeInput("1", "5", 2000L);
+
+ {
+ final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(10000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), "0+4"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), "0+5"))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+ driver.getTimestampedWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(2000L)));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), ValueAndTimestamp.make("0+4", 1000L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), ValueAndTimestamp.make("0+5", 2000L)))));
+ }
+ }
+ }
+
+ private void processData(final TopologyTestDriver driver) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("1", "1", 100L);
+ inputTopic.pipeInput("1", "2", 150L);
+ inputTopic.pipeInput("1", "3", 500L);
+ inputTopic.pipeInput("2", "10", 200L);
+ inputTopic.pipeInput("2", "20", 150L);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 29eb539..a65c481 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -460,6 +461,89 @@ public class SuppressScenarioTest {
}
@Test
+ public void shouldSupportFinalResultsForSlidingWindows() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<Windowed<String>, Long> valueCounts = builder
+ .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
+ .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5L), ofMillis(15L)))
+ .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
+ valueCounts
+ .suppress(untilWindowCloses(unbounded()))
+ .toStream()
+ .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
+ .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
+ valueCounts
+ .toStream()
+ .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
+ .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+ final Topology topology = builder.build();
+ System.out.println(topology.describe());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
+ inputTopic.pipeInput("k1", "v1", 10L);
+ inputTopic.pipeInput("k1", "v1", 11L);
+ inputTopic.pipeInput("k1", "v1", 10L);
+ inputTopic.pipeInput("k1", "v1", 13L);
+ inputTopic.pipeInput("k1", "v1", 10L);
+ inputTopic.pipeInput("k1", "v1", 24L);
+ // this update should get dropped, since the previous event advanced the stream time and closed the window.
+ inputTopic.pipeInput("k1", "v1", 5L);
+ inputTopic.pipeInput("k1", "v1", 7L);
+ // final record to advance stream time and flush windows
+ inputTopic.pipeInput("k1", "v1", 90L);
+ verify(
+ drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
+ asList(
+ // left window for k1@10 created when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
+ // right window for k1@10 created when k1@11 is processed
+ new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L),
+ // left window for k1@11 created when k1@11 is processed
+ new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
+ // left window for k1@10 updated when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
+ // left window for k1@11 updated when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
+ // right window for k1@10 updated when k1@13 is processed
+ new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+ // right window for k1@11 created when k1@13 is processed
+ new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+ // left window for k1@13 created when k1@13 is processed
+ new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
+ // left window for k1@10 updated when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
+ // left window for k1@11 updated when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
+ // left window for k1@13 updated when k1@10 is processed
+ new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+ // left window for k1@24 created when k1@24 is processed
+ new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L),
+ // left window for k1@10 updated when k1@5 is processed
+ new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L),
+ // left window for k1@10 updated when k1@7 is processed
+ new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+ // left window for k1@11 updated when k1@7 is processed
+ new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+ new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L)
+ )
+ );
+ verify(
+ drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+ asList(
+ new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+ new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+ new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+ new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+ new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+ new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L)
+ )
+ );
+ }
+ }
+
+ @Test
public void shouldSupportFinalResultsForSessionWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder