You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/31 22:23:41 UTC

[kafka] branch trunk updated: KAFKA-5636: SlidingWindows (KIP-450) (#9039)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85b6545  KAFKA-5636: SlidingWindows (KIP-450) (#9039)
85b6545 is described below

commit 85b6545b8159885c57ab67e08b7185be8a607988
Author: leah <lt...@confluent.io>
AuthorDate: Mon Aug 31 17:22:00 2020 -0500

    KAFKA-5636: SlidingWindows (KIP-450) (#9039)
    
    Add SlidingWindows API, implementation, and tests.
    An edge case and an optimization are left to follow-on work.
    
    Implements: KIP-450
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <mj...@apache.org>, John Roesler <vv...@apache.org>
---
 .../kafka/streams/kstream/CogroupedKStream.java    |   9 +
 .../kafka/streams/kstream/KGroupedStream.java      |   7 +
 .../kafka/streams/kstream/SlidingWindows.java      | 140 +++++
 .../kstream/internals/CogroupedKStreamImpl.java    |  14 +
 .../internals/CogroupedStreamAggregateBuilder.java |  75 ++-
 .../kstream/internals/KGroupedStreamImpl.java      |  16 +
 .../internals/KStreamSlidingWindowAggregate.java   | 307 +++++++++
 .../SessionWindowedCogroupedKStreamImpl.java       |  42 +-
 ...va => SlidingWindowedCogroupedKStreamImpl.java} | 118 ++--
 ...amImpl.java => SlidingWindowedKStreamImpl.java} | 144 ++---
 .../TimeWindowedCogroupedKStreamImpl.java          |  52 +-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  45 +-
 .../internals/graph/GraphGraceSearchUtil.java      |   6 +
 .../KStreamAggregationIntegrationTest.java         | 204 ++++++
 .../kafka/streams/kstream/SlidingWindowsTest.java  |  82 +++
 .../internals/CogroupedKStreamImplTest.java        |  55 +-
 .../kstream/internals/KGroupedStreamImplTest.java  | 317 +++++++---
 .../KStreamSlidingWindowAggregateTest.java         | 692 +++++++++++++++++++++
 .../SlidingWindowedCogroupedKStreamImplTest.java   | 249 ++++++++
 .../internals/SlidingWindowedKStreamImplTest.java  | 439 +++++++++++++
 .../kstream/internals/SuppressScenarioTest.java    |  84 +++
 21 files changed, 2721 insertions(+), 376 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 35bc8de..616b3b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -276,6 +276,15 @@ public interface CogroupedKStream<K, VOut> {
     <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);
 
     /**
+     * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
+     * windowed aggregations.
+     *
+     * @param windows the specification of the aggregation {@link SlidingWindows}
+     * @return an instance of {@link TimeWindowedCogroupedKStream}
+     */
+    TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows);
+
+    /**
      * Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session
      * windowed aggregations.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 56faea0..d805f8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -530,6 +530,13 @@ public interface KGroupedStream<K, V> {
     <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
 
     /**
+     * Create a new {@link TimeWindowedKStream} instance that can be used to perform sliding windowed aggregations.
+     * @param windows the specification of the aggregation {@link SlidingWindows}
+     * @return an instance of {@link TimeWindowedKStream}
+     */
+    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);
+
+    /**
      * Create a new {@link SessionWindowedKStream} instance that can be used to perform session windowed aggregations.
      * @param windows the specification of the aggregation {@link SessionWindows}
      * @return an instance of {@link TimeWindowedKStream}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
new file mode 100644
index 0000000..285c7c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between
+ * records in the same window, and the given window grace period. While the window is sliding over the input data stream, a new window is
+ * created each time a record enters the sliding window or a record drops out of the sliding window.
+ * <p>
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * <ul>
+ *     <li>window {@code [3000;8000]} contains [1] (created when first record enters the window)</li>
+ *     <li>window {@code [4200;9200]} contains [1,2] (created when second record enters the window)</li>
+ *     <li>window {@code [7400;12400]} contains [1,2,3] (created when third record enters the window)</li>
+ *     <li>window {@code [8001;13001]} contains [2,3] (created when the first record drops out of the window)</li>
+ *     <li>window {@code [9201;14201]} contains [3] (created when the second record drops out of the window)</li>
+ * </ul>
+ *<p>
+ * Note that while SlidingWindows are of a fixed size, as are {@link TimeWindows}, the start and end points of the window
+ * depend on when events occur in the stream (i.e., event timestamps), similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(SlidingWindows)
+ * @see CogroupedKStream#windowedBy(SlidingWindows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+    /** The size of the windows in milliseconds, defined by the max time difference between records. */
+    private final long timeDifferenceMs;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
+        this.timeDifferenceMs = timeDifferenceMs;
+        this.graceMs = graceMs;
+    }
+
+    /**
+     * Return a window definition with the window size based on the given maximum time difference (inclusive) between
+     * records in the same window and given window grace period. Reject out-of-order events that arrive after {@code grace}.
+     * A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size is < 0 or grace < 0, or either can't be represented as {@code long milliseconds}
+     */
+    public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
+        final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must not be negative.");
+        }
+        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace");
+        final long graceMs = ApiUtils.validateMillisecondDuration(grace, msgPrefixGrace);
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not be negative.");
+        }
+        return new SlidingWindows(timeDifferenceMs, graceMs);
+    }
+
+    public long timeDifferenceMs() {
+        return timeDifferenceMs;
+    }
+
+    public long gracePeriodMs() {
+        return graceMs;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final SlidingWindows that = (SlidingWindows) o;
+        return timeDifferenceMs == that.timeDifferenceMs &&
+            graceMs == that.graceMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timeDifferenceMs, graceMs);
+    }
+
+    @Override
+    public String toString() {
+        return "SlidingWindows{" +
+            ", sizeMs=" + timeDifferenceMs +
+            ", graceMs=" + graceMs +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
index 86ef133..ce15e7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
@@ -108,6 +109,19 @@ public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> imple
     }
 
     @Override
+    public TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows slidingWindows) {
+        Objects.requireNonNull(slidingWindows, "slidingWindows can't be null");
+        return new SlidingWindowedCogroupedKStreamImpl<>(
+            slidingWindows,
+            builder,
+            subTopologySourceNodes,
+            name,
+            aggregateBuilder,
+            streamsGraphNode,
+            groupPatterns);
+    }
+
+    @Override
     public SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows sessionWindows) {
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         return new SessionWindowedCogroupedKStreamImpl<>(sessionWindows,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 15051dd..03ef69d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
@@ -60,13 +61,13 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                    named.suffixWithOrElseGet(
-                        "-cogroup-agg-" + counter++,
-                        builder,
-                        CogroupedKStreamImpl.AGGREGATE_NAME),
-                    stateCreated,
-                    storeBuilder,
-                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+                named.suffixWithOrElseGet(
+                    "-cogroup-agg-" + counter++,
+                    builder,
+                    CogroupedKStreamImpl.AGGREGATE_NAME),
+                stateCreated,
+                storeBuilder,
+                new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
             stateCreated = true;
             processors.add(statefulProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
@@ -132,6 +133,34 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         return createTable(processors, named, keySerde, valueSerde, queryableName);
     }
 
+    <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                final Initializer<VOut> initializer,
+                                final NamedInternal named,
+                                final StoreBuilder<?> storeBuilder,
+                                final Serde<KR> keySerde,
+                                final Serde<VOut> valueSerde,
+                                final String queryableName,
+                                final SlidingWindows slidingWindows) {
+        processRepartitions(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                named.suffixWithOrElseGet(
+                    "-cogroup-agg-" + counter++,
+                    builder,
+                    CogroupedKStreamImpl.AGGREGATE_NAME),
+                stateCreated,
+                storeBuilder,
+                new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
     private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                      final StoreBuilder<?> storeBuilder) {
         for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
@@ -174,7 +203,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             CogroupedKStreamImpl.MERGE_NAME);
         final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
         final ProcessorGraphNode<K, VOut> mergeNode =
-                new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+            new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
 
         builder.addGraphNode(processors, mergeNode);
 
@@ -196,18 +225,18 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         final StatefulProcessorNode<K, ?> statefulProcessorNode;
         if (!stateCreated) {
             statefulProcessorNode =
-                    new StatefulProcessorNode<>(
-                        processorName,
-                        new ProcessorParameters<>(kStreamAggregate, processorName),
-                        storeBuilder
-                    );
+                new StatefulProcessorNode<>(
+                    processorName,
+                    new ProcessorParameters<>(kStreamAggregate, processorName),
+                    storeBuilder
+                );
         } else {
             statefulProcessorNode =
-                    new StatefulProcessorNode<>(
-                        processorName,
-                        new ProcessorParameters<>(kStreamAggregate, processorName),
-                        new String[]{storeBuilder.name()}
-                    );
+                new StatefulProcessorNode<>(
+                    processorName,
+                    new ProcessorParameters<>(kStreamAggregate, processorName),
+                    new String[]{storeBuilder.name()}
+                );
         }
 
         return statefulProcessorNode;
@@ -220,11 +249,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                                final Serde<?> valueSerde) {
 
         KStreamImpl.createRepartitionedSource(builder,
-                keySerde,
-                (Serde<VIn>) valueSerde,
-                repartitionTopicNamePrefix,
-                null,
-                (OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
+            keySerde,
+            (Serde<VIn>) valueSerde,
+            repartitionTopicNamePrefix,
+            null,
+            (OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
 
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index f8061ca..6c5cfc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
@@ -203,6 +204,21 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
     }
 
     @Override
+    public TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows) {
+
+        return new SlidingWindowedKStreamImpl<>(
+                windows,
+                builder,
+                subTopologySourceNodes,
+                name,
+                keySerde,
+                valueSerde,
+                aggregateBuilder,
+                streamsGraphNode
+        );
+    }
+
+    @Override
     public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
 
         return new SessionWindowedKStreamImpl<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
new file mode 100644
index 0000000..3ac58c3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding window
+            if (timestamp < windows.timeDifferenceMs()) {
+                log.warn(
+                    "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    timestamp - 2 * windows.timeDifferenceMs(),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        if (isLeftWindow(next)) {
+                            latestLeftTypeWindow = next.key.window();
+                        }
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                if (!windowStartTimes.contains(rightWinStart)) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                //there's a right window that the new record could create --> new record's left window is not empty
+                if (latestLeftTypeWindow != null) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create right window for new record
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+        }
+
+        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
+            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        }
+
+        private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
+            return window.key.window().end() == window.value.timestamp();
+        }
+
+        private void putAndForward(final Window window,
+                                   final ValueAndTimestamp<Agg> valueAndTime,
+                                   final K key,
+                                   final V value,
+                                   final long closeTime,
+                                   final long timestamp) {
+            final long windowStart = window.start();
+            final long windowEnd = window.end();
+            if (windowEnd > closeTime) {
+                //get aggregate from existing window
+                final Agg oldAgg = getValueOrNull(valueAndTime);
+                final Agg newAgg;
+                // keep old aggregate if adding a right window, else add new record's value
+                if (windowStart == timestamp + 1) {
+                    newAgg = oldAgg;
+                } else {
+                    newAgg = aggregator.apply(key, value, oldAgg);
+                }
+                final long newTimestamp = oldAgg == null ? timestamp : Math.max(timestamp, valueAndTime.timestamp());
+                windowStore.put(
+                    key,
+                    ValueAndTimestamp.make(newAgg, newTimestamp),
+                    windowStart);
+                tupleForwarder.maybeForward(
+                    new Windowed<K>(key, window),
+                    newAgg,
+                    sendOldValues ? oldAgg : null,
+                    newTimestamp);
+            } else {
+                log.warn(
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}] " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    windowStart, windowEnd,
+                    closeTime,
+                    observedStreamTime
+                );
+                lateRecordDropSensor.record();
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
+        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
+
+            public KTableValueGetter<Windowed<K>, Agg> get() {
+                return new KStreamWindowAggregateValueGetter();
+            }
+
+            @Override
+            public String[] storeNames() {
+                return new String[] {storeName};
+            }
+        };
+    }
+
+    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) {
+            final K key = windowedKey.key();
+            return windowStore.fetch(key, windowedKey.window().start());
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
index 20a1b08..cc50131 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
@@ -88,22 +88,22 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
         Objects.requireNonNull(materialized, "materialized can't be null");
         Objects.requireNonNull(named, "named can't be null");
         final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(
-                materialized,
-                builder,
-                CogroupedKStreamImpl.AGGREGATE_NAME);
+            materialized,
+            builder,
+            CogroupedKStreamImpl.AGGREGATE_NAME);
         return aggregateBuilder.build(
-                groupPatterns,
-                initializer,
-                new NamedInternal(named),
-                materialize(materializedInternal),
-                materializedInternal.keySerde() != null ?
-                        new WindowedSerdes.SessionWindowedSerde<>(
-                                materializedInternal.keySerde()) :
-                        null,
-                materializedInternal.valueSerde(),
-                materializedInternal.queryableStoreName(),
-                sessionWindows,
-                sessionMerger);
+            groupPatterns,
+            initializer,
+            new NamedInternal(named),
+            materialize(materializedInternal),
+            materializedInternal.keySerde() != null ?
+                new WindowedSerdes.SessionWindowedSerde<>(
+                    materializedInternal.keySerde()) :
+                null,
+            materializedInternal.valueSerde(),
+            materializedInternal.queryableStoreName(),
+            sessionWindows,
+            sessionMerger);
     }
 
 
@@ -118,12 +118,12 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
 
             if ((sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs()) > retentionPeriod) {
                 throw new IllegalArgumentException("The retention period of the session store "
-                                                       + materialized.storeName()
-                                                       + " must be no smaller than the session inactivity gap plus the"
-                                                       + " grace period."
-                                                       + " Got gap=[" + sessionWindows.inactivityGap() + "],"
-                                                       + " grace=[" + sessionWindows.gracePeriodMs() + "],"
-                                                       + " retention=[" + retentionPeriod + "]");
+                    + materialized.storeName()
+                    + " must be no smaller than the session inactivity gap plus the"
+                    + " grace period."
+                    + " Got gap=[" + sessionWindows.inactivityGap() + "],"
+                    + " grace=[" + sessionWindows.gracePeriodMs() + "],"
+                    + " retention=[" + retentionPeriod + "]");
             }
             supplier = Stores.persistentSessionStore(
                 materialized.storeName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
similarity index 50%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
index f6169bd..54ed885 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
@@ -23,37 +23,32 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
-import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
-
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V>
-        implements TimeWindowedCogroupedKStream<K, V> {
-
-    private final Windows<W> windows;
+public class SlidingWindowedCogroupedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedCogroupedKStream<K, V> {
+    private final SlidingWindows windows;
     private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns;
 
-    TimeWindowedCogroupedKStreamImpl(final Windows<W> windows,
-                                     final InternalStreamsBuilder builder,
-                                     final Set<String> subTopologySourceNodes,
-                                     final String name,
-                                     final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
-                                     final StreamsGraphNode streamsGraphNode,
-                                     final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns) {
+    SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+                                        final InternalStreamsBuilder builder,
+                                        final Set<String> subTopologySourceNodes,
+                                        final String name,
+                                        final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                                        final StreamsGraphNode streamsGraphNode,
+                                        final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns) {
         super(name, null, null, subTopologySourceNodes, streamsGraphNode, builder);
         //keySerde and valueSerde are null because there are many different groupStreams that they could be from
         this.windows = windows;
@@ -61,7 +56,6 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
         this.groupPatterns = groupPatterns;
     }
 
-
     @Override
     public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer) {
         return aggregate(initializer, Materialized.with(null, null));
@@ -91,71 +85,41 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
             builder,
             CogroupedKStreamImpl.AGGREGATE_NAME);
         return aggregateBuilder.build(
-                groupPatterns,
-                initializer,
-                new NamedInternal(named),
-                materialize(materializedInternal),
-                materializedInternal.keySerde() != null ?
-                        new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
-                        : null,
-                materializedInternal.valueSerde(),
-                materializedInternal.queryableStoreName(),
-                windows);
+            groupPatterns,
+            initializer,
+            new NamedInternal(named),
+            materialize(materializedInternal),
+            materializedInternal.keySerde() != null ?
+                new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs())
+                : null,
+            materializedInternal.valueSerde(),
+            materializedInternal.queryableStoreName(),
+            windows);
     }
 
-    @SuppressWarnings("deprecation")
-    // continuing to support Windows#maintainMs/segmentInterval in fallback mode
-    private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
-            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
+    private StoreBuilder<TimestampedWindowStore<K, V>> materialize(final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            if (materialized.retention() != null) {
-                // new style retention: use Materialized retention and default segmentInterval
-                final long retentionPeriod = materialized.retention().toMillis();
-
-                if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
-                    throw new IllegalArgumentException("The retention period of the window store "
-                                                       + name
-                                                       + " must be no smaller than its window size plus the grace period."
-                                                       + " Got size=[" + windows.size() + "],"
-                                                       + " grace=[" + windows.gracePeriodMs()
-                                                       + "],"
-                                                       + " retention=[" + retentionPeriod
-                                                       + "]");
-                }
-
-                supplier = Stores.persistentTimestampedWindowStore(
-                    materialized.storeName(),
-                    Duration.ofMillis(retentionPeriod),
-                    Duration.ofMillis(windows.size()),
-                    false
-                );
-
-            } else {
-                // old style retention: use deprecated Windows retention/segmentInterval.
-
-                // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
-                // to be (windows.size() + windows.grace()). This will yield the same default behavior.
+            final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+
+            if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of the window store "
+                    + name
+                    + " must be no smaller than 2 * time difference plus the grace period."
+                    + " Got time difference=[" + windows.timeDifferenceMs() + "],"
+                    + " grace=[" + windows.gracePeriodMs()
+                    + "],"
+                    + " retention=[" + retentionPeriod
+                    + "]");
+            }
 
-                if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
-                    throw new IllegalArgumentException("The retention period of the window store "
-                                                       + name
-                                                       + " must be no smaller than its window size plus the grace period."
-                                                       + " Got size=[" + windows.size() + "],"
-                                                       + " grace=[" + windows.gracePeriodMs()
-                                                       + "],"
-                                                       + " retention=[" + windows.maintainMs()
-                                                       + "]");
-                }
+            supplier = Stores.persistentTimestampedWindowStore(
+                materialized.storeName(),
+                Duration.ofMillis(retentionPeriod),
+                Duration.ofMillis(windows.timeDifferenceMs()),
+                false
+            );
 
-                supplier = new RocksDbWindowBytesStoreSupplier(
-                    materialized.storeName(),
-                    windows.maintainMs(),
-                    Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
-                    windows.size(),
-                    false,
-                    true);
-            }
         }
         final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
             .timestampedWindowStoreBuilder(
@@ -169,10 +133,12 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
         } else {
             builder.withLoggingDisabled();
         }
-
         if (materialized.cachingEnabled()) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();
         }
         return builder;
     }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index aaf8fbc..0fc8da1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -25,38 +25,33 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
-import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
-
 import java.time.Duration;
 import java.util.Objects;
 import java.util.Set;
-
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
 
-public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
-
-    private final Windows<W> windows;
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
-    TimeWindowedKStreamImpl(final Windows<W> windows,
-                            final InternalStreamsBuilder builder,
-                            final Set<String> subTopologySourceNodes,
-                            final String name,
-                            final Serde<K> keySerde,
-                            final Serde<V> valueSerde,
-                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
-                            final StreamsGraphNode streamsGraphNode) {
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                               final InternalStreamsBuilder builder,
+                               final Set<String> subTopologySourceNodes,
+                               final String name,
+                               final Serde<K> keySerde,
+                               final Serde<V> valueSerde,
+                               final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                               final StreamsGraphNode streamsGraphNode) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
         this.windows = Objects.requireNonNull(windows, "windows can't be null");
         this.aggregateBuilder = aggregateBuilder;
@@ -72,7 +67,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         return doCount(named, Materialized.with(keySerde, Serdes.Long()));
     }
 
-
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         return count(NamedInternal.empty(), materialized);
@@ -81,20 +75,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     @Override
     public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-
-        // TODO: remove this when we do a topology-incompatible release
-        // we used to burn a topology name here, so we have to keep doing it for compatibility
-        if (new MaterializedInternal<>(materialized).storeName() == null) {
-            builder.newStoreName(AGGREGATE_NAME);
-        }
-
         return doCount(named, materialized);
     }
 
     private KTable<Windowed<K>, Long> doCount(final Named named,
                                               final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -104,13 +91,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(aggregateName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+                materializedInternal.valueSerde());
     }
 
     @Override
@@ -126,7 +114,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
     }
 
-
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator,
@@ -143,19 +130,19 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
-
         final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(aggregateName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+                materializedInternal.valueSerde());
     }
 
     @Override
@@ -183,7 +170,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(materialized, "materialized can't be null");
 
         final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+                new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
@@ -193,65 +180,41 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(reduceName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
+                materializedInternal.valueSerde());
     }
 
-    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            if (materialized.retention() != null) {
-                // new style retention: use Materialized retention and default segmentInterval
-                final long retentionPeriod = materialized.retention().toMillis();
-
-                if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
-                    throw new IllegalArgumentException("The retention period of the window store "
-                                                           + name + " must be no smaller than its window size plus the grace period."
-                                                           + " Got size=[" + windows.size() + "],"
-                                                           + " grace=[" + windows.gracePeriodMs() + "],"
-                                                           + " retention=[" + retentionPeriod + "]");
-                }
-
-                supplier = Stores.persistentTimestampedWindowStore(
+            final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+
+            // large retention time to ensure that all existing windows needed to create new sliding windows can be accessed
+            // earliest window start time we could need to create corresponding right window would be recordTime - 2 * timeDifference
+            if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of the window store "
+                        + name + " must be no smaller than 2 * time difference plus the grace period."
+                        + " Got time difference=[" + windows.timeDifferenceMs() + "],"
+                        + " grace=[" + windows.gracePeriodMs() + "],"
+                        + " retention=[" + retentionPeriod + "]");
+            }
+            supplier = Stores.persistentTimestampedWindowStore(
                     materialized.storeName(),
                     Duration.ofMillis(retentionPeriod),
-                    Duration.ofMillis(windows.size()),
+                    Duration.ofMillis(windows.timeDifferenceMs()),
                     false
-                );
-
-            } else {
-                // old style retention: use deprecated Windows retention/segmentInterval.
-
-                // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
-                // to be (windows.size() + windows.grace()). This will yield the same default behavior.
-
-                if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
-                    throw new IllegalArgumentException("The retention period of the window store "
-                                                           + name + " must be no smaller than its window size plus the grace period."
-                                                           + " Got size=[" + windows.size() + "],"
-                                                           + " grace=[" + windows.gracePeriodMs() + "],"
-                                                           + " retention=[" + windows.maintainMs() + "]");
-                }
-
-                supplier = new RocksDbWindowBytesStoreSupplier(
-                    materialized.storeName(),
-                    windows.maintainMs(),
-                    Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
-                    windows.size(),
-                    false,
-                    true);
-            }
+            );
         }
         final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde()
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
         );
 
         if (materialized.loggingEnabled()) {
@@ -259,9 +222,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         } else {
             builder.withLoggingDisabled();
         }
-
         if (materialized.cachingEnabled()) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();
         }
         return builder;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
index f6169bd..299c2d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
@@ -41,7 +41,7 @@ import java.util.Objects;
 import java.util.Set;
 
 public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V>
-        implements TimeWindowedCogroupedKStream<K, V> {
+    implements TimeWindowedCogroupedKStream<K, V> {
 
     private final Windows<W> windows;
     private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
@@ -91,22 +91,22 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
             builder,
             CogroupedKStreamImpl.AGGREGATE_NAME);
         return aggregateBuilder.build(
-                groupPatterns,
-                initializer,
-                new NamedInternal(named),
-                materialize(materializedInternal),
-                materializedInternal.keySerde() != null ?
-                        new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
-                        : null,
-                materializedInternal.valueSerde(),
-                materializedInternal.queryableStoreName(),
-                windows);
+            groupPatterns,
+            initializer,
+            new NamedInternal(named),
+            materialize(materializedInternal),
+            materializedInternal.keySerde() != null ?
+                new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
+                : null,
+            materializedInternal.valueSerde(),
+            materializedInternal.queryableStoreName(),
+            windows);
     }
 
     @SuppressWarnings("deprecation")
     // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
-            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
             if (materialized.retention() != null) {
@@ -115,13 +115,13 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
 
                 if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
                     throw new IllegalArgumentException("The retention period of the window store "
-                                                       + name
-                                                       + " must be no smaller than its window size plus the grace period."
-                                                       + " Got size=[" + windows.size() + "],"
-                                                       + " grace=[" + windows.gracePeriodMs()
-                                                       + "],"
-                                                       + " retention=[" + retentionPeriod
-                                                       + "]");
+                        + name
+                        + " must be no smaller than its window size plus the grace period."
+                        + " Got size=[" + windows.size() + "],"
+                        + " grace=[" + windows.gracePeriodMs()
+                        + "],"
+                        + " retention=[" + retentionPeriod
+                        + "]");
                 }
 
                 supplier = Stores.persistentTimestampedWindowStore(
@@ -139,13 +139,13 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
 
                 if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
                     throw new IllegalArgumentException("The retention period of the window store "
-                                                       + name
-                                                       + " must be no smaller than its window size plus the grace period."
-                                                       + " Got size=[" + windows.size() + "],"
-                                                       + " grace=[" + windows.gracePeriodMs()
-                                                       + "],"
-                                                       + " retention=[" + windows.maintainMs()
-                                                       + "]");
+                        + name
+                        + " must be no smaller than its window size plus the grace period."
+                        + " Got size=[" + windows.size() + "],"
+                        + " grace=[" + windows.gracePeriodMs()
+                        + "],"
+                        + " retention=[" + windows.maintainMs()
+                        + "]");
                 }
 
                 supplier = new RocksDbWindowBytesStoreSupplier(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index aaf8fbc..3e6b03e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -104,13 +104,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(aggregateName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+                materializedInternal.valueSerde());
+
+
     }
 
     @Override
@@ -149,13 +152,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(aggregateName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+                materializedInternal.valueSerde());
+
+
     }
 
     @Override
@@ -193,13 +199,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
         return aggregateBuilder.build(
-            new NamedInternal(reduceName),
-            materialize(materializedInternal),
-            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-            materializedInternal.valueSerde());
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+                materializedInternal.valueSerde());
+
+
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 2fb28dd..e9a4796 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -18,8 +18,10 @@ package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
@@ -79,6 +81,10 @@ public final class GraphGraceSearchUtil {
                 final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
                 final SessionWindows windows = kStreamSessionWindowAggregate.windows();
                 return windows.gracePeriodMs() + windows.inactivityGap();
+            } else if (processorSupplier instanceof KStreamSlidingWindowAggregate) {
+                final KStreamSlidingWindowAggregate kStreamSlidingWindowAggregate = (KStreamSlidingWindowAggregate) processorSupplier;
+                final SlidingWindows windows = kStreamSlidingWindowAggregate.windows();
+                return windows.gracePeriodMs();
             } else {
                 return null;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 7de4f62..5cd08f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Transformer;
@@ -87,6 +88,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static java.time.Duration.ofMillis;
+import static java.time.Duration.ofMinutes;
 import static java.time.Instant.ofEpochMilli;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -460,6 +462,208 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @Test
+    public void shouldReduceSlidingWindows() throws Exception {
+        final long firstBatchTimestamp = mockTime.milliseconds();
+        final long timeDifference = 500L;
+        produceMessages(firstBatchTimestamp);
+        final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2;
+        produceMessages(secondBatchTimestamp);
+        final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
+        produceMessages(thirdBatchTimestamp);
+
+        final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L)))
+                .reduce(reducer)
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
+
+        startStreams();
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput = receiveMessages(
+                new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
+                new StringDeserializer(),
+                String.class,
+                25);
+
+        final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new StringDeserializer(),
+                String.class,
+                25,
+                true);
+
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                        .thenComparing(KeyValueTimestamp::value);
+
+        windowedOutput.sort(comparator);
+        final long firstBatchLeftWindowStart = firstBatchTimestamp - timeDifference;
+        final long firstBatchRightWindowStart = firstBatchTimestamp + 1;
+
+        final long secondBatchLeftWindowStart = secondBatchTimestamp - timeDifference;
+        final long secondBatchRightWindowStart = secondBatchTimestamp + 1;
+
+        final long thirdBatchLeftWindowStart = thirdBatchTimestamp - timeDifference;
+
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
+                // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "A", firstBatchTimestamp),
+                // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A", secondBatchTimestamp),
+                // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "A", thirdBatchTimestamp),
+                // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "A:A", secondBatchTimestamp),
+                // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "A:A", thirdBatchTimestamp),
+                // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "A:A:A", thirdBatchTimestamp),
+
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "B", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "B", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "B:B", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "B:B", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "B:B:B", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "C", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "C", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "C:C", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "C:C", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "C:C:C", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "D", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "D", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "D:D", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "D:D", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "D:D:D", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindowStart, Long.MAX_VALUE)), "E", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindowStart, Long.MAX_VALUE)), "E", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindowStart, Long.MAX_VALUE)), "E:E", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindowStart, Long.MAX_VALUE)), "E:E", thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindowStart, Long.MAX_VALUE)), "E:E:E", thirdBatchTimestamp)
+        );
+        assertThat(windowedOutput, is(expectResult));
+
+        final Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (final KeyValueTimestamp<Windowed<String>, String> eachRecord: expectResult) {
+            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", "
+                    + eachRecord.key() + ", " + eachRecord.value());
+        }
+
+        // check every message is contained in the expect result
+        final String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (final String record: allRecords) {
+            assertTrue(expectResultString.contains(record));
+        }
+    }
+
+    @Test
+    public void shouldAggregateSlidingWindows() throws Exception {
+        final long firstBatchTimestamp = mockTime.milliseconds();
+        final long timeDifference = 500L;
+        produceMessages(firstBatchTimestamp);
+        final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2;
+        produceMessages(secondBatchTimestamp);
+        final long thirdBatchTimestamp = firstBatchTimestamp + timeDifference - 100L;
+        produceMessages(thirdBatchTimestamp);
+
+        final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5)))
+                .aggregate(
+                        initializer,
+                        aggregator,
+                        Materialized.with(null, Serdes.Integer())
+                )
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
+
+        startStreams();
+
+        final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
+                new TimeWindowedDeserializer<>(),
+                new IntegerDeserializer(),
+                String.class,
+                15);
+
+        // read from ConsoleConsumer
+        final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new IntegerDeserializer(),
+                String.class,
+                15,
+                true);
+
+        final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> o) -> o.key().key())
+                        .thenComparingInt(KeyValueTimestamp::value);
+        windowedMessages.sort(comparator);
+
+        final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+        final long firstBatchRightWindow = firstBatchTimestamp + 1;
+        final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference;
+        final long secondBatchRightWindow = secondBatchTimestamp + 1;
+        final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+        final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
+                // A @ firstBatchTimestamp left window created when A @ firstBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+                // A @ firstBatchTimestamp right window created when A @ secondBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+                // A @ secondBatchTimestamp right window created when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+                // A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+                // A @ firstBatchTimestamp right window updated when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+                // A @ thirdBatchTimestamp left window created when A @ thirdBatchTimestamp processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), 1, firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 1, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchRightWindow, Long.MAX_VALUE)), 1, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchLeftWindow, Long.MAX_VALUE)), 2, secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchRightWindow, Long.MAX_VALUE)), 2, thirdBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(thirdBatchLeftWindow, Long.MAX_VALUE)), 3, thirdBatchTimestamp)
+        );
+
+        assertThat(windowedMessages, is(expectResult));
+
+        final Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (final KeyValueTimestamp<Windowed<String>, Integer> eachRecord: expectResult) {
+            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
+        }
+
+        // check every message is contained in the expect result
+        final String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (final String record: allRecords) {
+            assertTrue(expectResultString.contains(record));
+        }
+
+    }
+
+    @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
new file mode 100644
index 0000000..f6c63a3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
+import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowsTest {
+
+    private static final long ANY_SIZE = 123L;
+
+    @Test
+    public void shouldSetTimeDifference() {
+        assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs());
+    }
+
+    @Test
+    public void timeDifferenceMustNotBeNegative() {
+        assertThrows(IllegalArgumentException.class, () ->  SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5)));
+    }
+
+    @Test
+    public void shouldSetGracePeriod() {
+        assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(ANY_SIZE)).gracePeriodMs());
+    }
+
+    @Test
+    public void gracePeriodMustNotBeNegative() {
+        assertThrows(IllegalArgumentException.class, () ->  SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1)));
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+        final long grace = 1L + (long) (Math.random() * (20L - 1L));
+        final long timeDifference = 1L + (long) (Math.random() * (20L - 1L));
+        verifyEquality(
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)),
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace))
+        );
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldNotBeEqualForDifferentTimeDifference() {
+        final long grace = 1L + (long) (Math.random() * (10L - 1L));
+        final long timeDifferenceOne = 1L + (long) (Math.random() * (10L - 1L));
+        final long timeDifferenceTwo = 21L + (long) (Math.random() * (41L - 21L));
+        verifyInEquality(
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifferenceOne), ofMillis(grace)),
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifferenceTwo), ofMillis(grace))
+        );
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldNotBeEqualForDifferentGracePeriod() {
+        final long timeDifference = 1L + (long) (Math.random() * (10L - 1L));
+        final long graceOne = 1L + (long) (Math.random() * (10L - 1L));
+        final long graceTwo = 21L + (long) (Math.random() * (41L - 21L));
+        verifyInEquality(
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(graceOne)),
+                SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(graceTwo))
+        );
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index d780fa7..eb73b77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
 
 import java.util.Properties;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -85,64 +87,69 @@ public class CogroupedKStreamImplTest {
         cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
-        cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullAggregatorOnCogroup() {
-        cogroupedStream.cogroup(groupedStream, null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.cogroup(groupedStream, null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnAggregate() {
-        cogroupedStream.aggregate(null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
-        cogroupedStream.aggregate(null, Named.as("name"));
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Named.as("name")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
-        cogroupedStream.aggregate(null, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
-        cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store"));
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullNamedOnAggregate() {
-        cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullMaterializedOnAggregate() {
-        cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
-        cogroupedStream.aggregate(STRING_INITIALIZER,  null,  Materialized.as("store"));
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER,  null,  Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
-        cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullWindowOnWindowedByTime() {
-        cogroupedStream.windowedBy((Windows<? extends Window>) null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((Windows<? extends Window>) null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullWindowOnWindowedBySession() {
-        cogroupedStream.windowedBy((SessionWindows) null);
+        assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((SessionWindows) null));
+    }
+
+    @Test
+    public void shouldNotHaveNullWindowOnWindowedBySliding() {
+        assertThrows(NullPointerException.class, () -> cogroupedStream.windowedBy((SlidingWindows) null));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 34de30c..a23b98d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
@@ -61,6 +62,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
 
 public class KGroupedStreamImplTest {
 
@@ -77,82 +80,212 @@ public class KGroupedStreamImplTest {
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullAggregatorOnCogroup() {
-        groupedStream.cogroup(null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.cogroup(null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullReducerOnReduce() {
-        groupedStream.reduce(null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.reduce(null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotHaveInvalidStoreNameOnReduce() {
-        groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+        assertThrows(TopologyException.class, () ->  groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullReducerWithWindowedReduce() {
-        groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(10)))
-            .reduce(null, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .reduce(null, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullWindowsWithWindowedReduce() {
-        groupedStream.windowedBy((Windows<?>) null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.windowedBy((Windows<?>) null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
-        groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(10)))
-            .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnAggregate() {
-        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullAdderOnAggregate() {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotHaveInvalidStoreNameOnAggregate() {
-        groupedStream.aggregate(
-            MockInitializer.STRING_INIT,
-            MockAggregator.TOSTRING_ADDER,
-            Materialized.as(INVALID_STORE_NAME));
+        assertThrows(TopologyException.class, () ->  groupedStream.aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.as(INVALID_STORE_NAME)));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullInitializerOnWindowedAggregate() {
-        groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(10)))
-            .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullAdderOnWindowedAggregate() {
-        groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(10)))
-            .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotHaveNullWindowsOnWindowedAggregate() {
-        groupedStream.windowedBy((Windows<?>) null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.windowedBy((Windows<?>) null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+    }
+
+    @Test
+    public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .reduce(null, Materialized.as("store")));
+    }
+
+    @Test
+    public void shouldNotHaveNullWindowsWithSlidingWindowedReduce() {
+        assertThrows(NullPointerException.class, () ->  groupedStream.windowedBy((SlidingWindows) null));
+    }
+
+    @Test
+    public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+    }
+
+    @Test
+    public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
+    }
+
+    @Test
+    public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
+    }
+
+    @Test
+    public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
+    }
+
+    @Test
+    public void shouldCountSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(10)))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .count(Materialized.as("aggregate-by-key-windowed"))
+                .toStream()
+                .process(supplier);
+
+        doCountSlidingWindows(supplier);
+    }
+
+    @Test
+    public void shouldCountSlidingWindowsWithInternalStoreName() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        groupedStream
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .count()
+                .toStream()
+                .process(supplier);
+
+        doCountSlidingWindows(supplier);
+    }
+
+    private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>, Long> supplier) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput("1", "A", 500L);
+            inputTopic.pipeInput("1", "A", 999L);
+            inputTopic.pipeInput("1", "A", 600L);
+            inputTopic.pipeInput("2", "B", 500L);
+            inputTopic.pipeInput("2", "B", 600L);
+            inputTopic.pipeInput("2", "B", 700L);
+            inputTopic.pipeInput("3", "C", 501L);
+            inputTopic.pipeInput("1", "A", 1000L);
+            inputTopic.pipeInput("1", "A", 1000L);
+            inputTopic.pipeInput("2", "B", 1000L);
+            inputTopic.pipeInput("2", "B", 1000L);
+            inputTopic.pipeInput("3", "C", 600L);
+        }
+        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
+                // processing A@500
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
+                // processing A@999
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
+                // processing A@600
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
+                // processing B@500
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
+                // processing B@600
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
+                // processing B@700
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
+                // processing C@501
+                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
+                // processing first A@1000
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
+                // processing second A@1000
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
+                // processing first B@1000
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
+                // processing second B@1000
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
+                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
+                // processing C@600
+                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
+                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
+
+        )));
     }
 
     private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
@@ -305,70 +438,61 @@ public class KGroupedStreamImplTest {
         assertNull(table.queryableStoreName());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
-            .reduce(null, Materialized.as("store"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .reduce(null, Materialized.as("store")));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
-        groupedStream.windowedBy((SessionWindows) null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.windowedBy((SessionWindows) null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
-            .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
-            .reduce(
-                null,
-                Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
-            .aggregate(
-                null,
-                MockAggregator.TOSTRING_ADDER,
-                (aggKey, aggOne, aggTwo) -> null,
-                Materialized.as("storeName"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
-        groupedStream.
-            windowedBy(SessionWindows.with(ofMillis(30)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                null,
-                (aggKey, aggOne, aggTwo) -> null,
-                Materialized.as("storeName"));
+        assertThrows(NullPointerException.class, () -> groupedStream.
+                windowedBy(SessionWindows.with(ofMillis(30)))
+                .aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                null,
-                Materialized.as("storeName"));
+        assertThrows(NullPointerException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as("storeName"))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy((SessionWindows) null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.windowedBy((SessionWindows) null));
     }
 
     @Test
@@ -376,36 +500,33 @@ public class KGroupedStreamImplTest {
         groupedStream
             .windowedBy(SessionWindows.with(ofMillis(10)))
             .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                (aggKey, aggOne, aggTwo) -> null,
-                Materialized.with(Serdes.String(), Serdes.String()));
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    (aggKey, aggOne, aggTwo) -> null, Materialized.with(Serdes.String(), Serdes.String())
+            );
     }
 
-    @Test(expected = TopologyException.class)
+    @Test
     public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
-        groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(10)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                (aggKey, aggOne, aggTwo) -> null,
-                Materialized.as(INVALID_STORE_NAME));
+        assertThrows(TopologyException.class, () ->  groupedStream
+                .windowedBy(SessionWindows.with(ofMillis(10)))
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as(INVALID_STORE_NAME))
+        );
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
-        groupedStream.reduce(MockReducer.STRING_ADDER, null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.reduce(MockReducer.STRING_ADDER, null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
-        groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null);
+        assertThrows(NullPointerException.class, () ->  groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
new file mode 100644
index 0000000..6bcb52c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final String threadId = Thread.currentThread().getName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 15L);
+            inputTopic.pipeInput("A", "3", 20L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1", 10L));
+        expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L));
+        expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L));
+        expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L));
+        expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L));
+        expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L));
+        expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L));
+        expected.put(23L, ValueAndTimestamp.make("0+5", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReduceSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 14L);
+            inputTopic.pipeInput("A", "3", 15L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 26L);
+            inputTopic.pipeInput("A", "6", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("1", 10L));
+        expected.put(4L, ValueAndTimestamp.make("1+2", 14L));
+        expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L));
+        expected.put(11L, ValueAndTimestamp.make("2+3", 15L));
+        expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L));
+        expected.put(15L, ValueAndTimestamp.make("3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("4+5", 26L));
+        expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L));
+        expected.put(23L, ValueAndTimestamp.make("5+6", 30L));
+        expected.put(27L, ValueAndTimestamp.make("6", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testAggregateLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("A", "2", 20L);
+            inputTopic1.pipeInput("A", "3", 22L);
+            inputTopic1.pipeInput("A", "4", 15L);
+
+            inputTopic1.pipeInput("B", "1", 12L);
+            inputTopic1.pipeInput("B", "2", 13L);
+            inputTopic1.pipeInput("B", "3", 18L);
+            inputTopic1.pipeInput("B", "4", 19L);
+            inputTopic1.pipeInput("B", "5", 25L);
+            inputTopic1.pipeInput("B", "6", 14L);
+
+            inputTopic1.pipeInput("C", "1", 11L);
+            inputTopic1.pipeInput("C", "2", 15L);
+            inputTopic1.pipeInput("C", "3", 16L);
+            inputTopic1.pipeInput("C", "4", 21);
+            inputTopic1.pipeInput("C", "5", 23L);
+
+            inputTopic1.pipeInput("D", "4", 11L);
+            inputTopic1.pipeInput("D", "2", 12L);
+            inputTopic1.pipeInput("D", "3", 29L);
+            inputTopic1.pipeInput("D", "5", 16L);
+        }
+
+        assertEquals(
+                asList(
+                        // FINAL WINDOW: A@10 left window created when A@10 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
+                        // A@10 right window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
+                        // A@20 left window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
+                        // FINAL WINDOW: A@20 right window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
+                        // A@22 left window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
+                        // FINAL WINDOW: A@20 left window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
+                        // FINAL WINDOW: A@10 right window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
+                        // FINAL WINDOW: A@22 left window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
+                        // FINAL WINDOW: A@15 left window created when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15),
+                        // FINAL WINDOW: A@15 right window created when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22),
+
+                        // FINAL WINDOW: B@12 left window created when B@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12),
+                        // B@12 right window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
+                        // FINAL WINDOW: B@13 left window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13),
+                        // B@12 right window updated when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18),
+                        // B@13 right window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
+                        // B@18 left window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18),
+                        // B@12 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19),
+                        // B@13 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
+                        // B@18 right window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
+                        // B@19 left window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19),
+                        // FINAL WINDOW: B@18 right window updated when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25),
+                        // FINAL WINDOW: B@19 right window updated when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25),
+                        // FINAL WINDOW: B@25 left window created when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
+                        // FINAL WINDOW: B@18 left window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
+                        // FINAL WINDOW: B@19 left window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
+                        // FINAL WINDOW: B@12 right window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
+                        // FINAL WINDOW: B@13 right window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
+                        // FINAL WINDOW: B@14 left window created when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
+
+                        // FINAL WINDOW: C@11 left window created when C@11 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11),
+                        // C@11 right window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
+                        // FINAL WINDOW: C@15 left window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15),
+                        // C@11 right window updated when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
+                        // C@15 right window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
+                        // FINAL WINDOW: C@16 left window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16),
+                        // FINAL WINDOW: C@11 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
+                        // C@15 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
+                        // C@16 right window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
+                        // FINAL WINDOW: C@21 left window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21),
+                        // FINAL WINDOW: C@15 right window updated when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23),
+                        // FINAL WINDOW: C@16 right window updated when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23),
+                        // FINAL WINDOW: C@21 right window created when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23),
+                        // FINAL WINDOW: C@23 left window created when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
+
+                        // FINAL WINDOW: D@11 left window created when D@11 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11),
+                        // D@11 right window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
+                        // FINAL WINDOW: D@12 left window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12),
+                        // FINAL WINDOW: D@29 left window created when D@29 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29),
+                        // FINAL WINDOW: D@11 right window updated when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16),
+                        // FINAL WINDOW: D@12 right window created when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16),
+                        // FINAL WINDOW: D@16 left window created when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16)
+                        ),
+                supplier.theCapturedProcessor().processed()
+        );
+    }
+
+    @Test
+    public void testJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
+
+        final KTable<Windowed<String>, String> table1 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())
+                );
+        table2.toStream().process(supplier);
+
+        table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("B", "2", 11L);
+            inputTopic1.pipeInput("C", "3", 12L);
+
+            final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // left windows created by the first set of records to table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(1, 11)),  "0+2",  11),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)),  "0+3",  12)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic1.pipeInput("A", "1", 15L);
+            inputTopic1.pipeInput("B", "2", 16L);
+            inputTopic1.pipeInput("C", "3", 19L);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // right windows from previous records are created, and left windows from new records to table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)),  "0+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(12, 22)),  "0+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)),  "0+2+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)),  "0+3",  19),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(9, 19)),  "0+3+3",  19)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic2.pipeInput("A", "a", 10L);
+            inputTopic2.pipeInput("B", "b", 30L);
+            inputTopic2.pipeInput("C", "c", 12L);
+            inputTopic2.pipeInput("C", "c", 35L);
+
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // left windows from first set of records sent to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)),  "0+b",  30),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)),  "0+c",  12),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25, 35)),  "0+c",  35)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1%0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)),  "0+3%0+c",  12)
+            );
+
+            inputTopic2.pipeInput("A", "a", 15L);
+            inputTopic2.pipeInput("B", "b", 16L);
+            inputTopic2.pipeInput("C", "c", 17L);
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // right windows from previous records are created (where applicable), and left windows from new records to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)),  "0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)),  "0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)),  "0+c",  17),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(7, 17)),  "0+c+c",  17)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)),  "0+1%0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)),  "0+1+1%0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)),  "0+2+2%0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)),  "0+3%0+c",  19)
+            );
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullKey() {
+        final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
+
+        props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+             final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput(null, "1");
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. value=[1] topic=[topic] partition=[0] offset=[0]"));
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+        final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90L)))
+                .aggregate(
+                        () -> "",
+                        MockAggregator.toStringInstance("+"),
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+                )
+                .toStream()
+                .map((key, value) -> new KeyValue<>(key.toString(), value))
+                .to("output");
+
+        props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+             final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput("k", "100", 200L);
+            inputTopic.pipeInput("k", "0", 100L);
+            inputTopic.pipeInput("k", "1", 101L);
+            inputTopic.pipeInput("k", "2", 102L);
+            inputTopic.pipeInput("k", "3", 103L);
+            inputTopic.pipeInput("k", "4", 104L);
+            inputTopic.pipeInput("k", "5", 105L);
+            inputTopic.pipeInput("k", "6", 15L);
+
+            assertLatenessMetrics(driver, is(7.0), is(185.0), is(96.25));
+
+            assertThat(appender.getMessages(), hasItems(
+                    // left window for k@100
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]",
+                    // left window for k@101
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]",
+                    // left window for k@102
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]",
+                    // left window for k@103
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]",
+                    // left window for k@104
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]",
+                    // left window for k@105
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]",
+                    // left window for k@15
+                    "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"
+            ));
+            final TestOutputTopic<String, String> outputTopic =
+                    driver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+            assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>("[k@190/200]", "+100", null, 200L)));
+            assertTrue(outputTopic.isEmpty());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateRandomInput() {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000)))
+            // The aggregator needs to sort the strings so the window value is the same for the final windows even when
+            // records are processed in a different order. Here, we sort alphabetically.
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> {
+                    aggregate += value;
+                    final char[] ch = aggregate.toCharArray();
+                    Arrays.sort(ch);
+                    aggregate = String.valueOf(ch);
+                    return aggregate;
+                },
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        final long seed = new Random().nextLong();
+        final Random shuffle = new Random(seed);
+
+        try {
+
+            final List<ValueAndTimestamp<String>> input = Arrays.asList(
+                ValueAndTimestamp.make("A", 10L),
+                ValueAndTimestamp.make("B", 15L),
+                ValueAndTimestamp.make("C", 16L),
+                ValueAndTimestamp.make("D", 18L),
+                ValueAndTimestamp.make("E", 30L),
+                ValueAndTimestamp.make("F", 40L),
+                ValueAndTimestamp.make("G", 55L),
+                ValueAndTimestamp.make("H", 56L),
+                ValueAndTimestamp.make("I", 58L),
+                ValueAndTimestamp.make("J", 58L),
+                ValueAndTimestamp.make("K", 62L),
+                ValueAndTimestamp.make("L", 63L),
+                ValueAndTimestamp.make("M", 63L),
+                ValueAndTimestamp.make("N", 63L),
+                ValueAndTimestamp.make("O", 76L),
+                ValueAndTimestamp.make("P", 77L),
+                ValueAndTimestamp.make("Q", 80L)
+            );
+
+            Collections.shuffle(input, shuffle);
+            try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+                final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+                for (int i = 0; i < input.size(); i++) {
+                    inputTopic1.pipeInput("A", input.get(i).value(), input.get(i).timestamp());
+                }
+            }
+
+            final Map<Long, ValueAndTimestamp<String>> results = new HashMap<>();
+
+            for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+                final Windowed<String> window = entry.key();
+                final Long start = window.window().start();
+                final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+                if (results.putIfAbsent(start, valueAndTimestamp) != null) {
+                    results.replace(start, valueAndTimestamp);
+                }
+            }
+            verifyRandomTestResults(results);
+        } catch (final AssertionError t) {
+            throw new AssertionError(
+                "Assertion failed in randomized test. Reproduce with seed: " + seed + ".",
+                t
+            );
+        } catch (final Throwable t) {
+            final StringBuilder sb =
+                new StringBuilder()
+                    .append("Exception in randomized scenario. Reproduce with seed: ")
+                    .append(seed)
+                    .append(".");
+            throw new AssertionError(sb.toString(), t);
+        }
+    }
+
+    private void verifyRandomTestResults(final Map<Long, ValueAndTimestamp<String>> actual) {
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("A", 10L));
+        expected.put(5L, ValueAndTimestamp.make("AB", 15L));
+        expected.put(6L, ValueAndTimestamp.make("ABC", 16L));
+        expected.put(8L, ValueAndTimestamp.make("ABCD", 18L));
+        expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
+        expected.put(16L, ValueAndTimestamp.make("CD", 18L));
+        expected.put(17L, ValueAndTimestamp.make("D", 18L));
+        expected.put(20L, ValueAndTimestamp.make("E", 30L));
+        expected.put(30L, ValueAndTimestamp.make("EF", 40L));
+        expected.put(31L, ValueAndTimestamp.make("F", 40L));
+        expected.put(45L, ValueAndTimestamp.make("G", 55L));
+        expected.put(46L, ValueAndTimestamp.make("GH", 56L));
+        expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
+        expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
+        expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
+        expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
+        expected.put(57L, ValueAndTimestamp.make("IJKLMN", 63L));
+        expected.put(59L, ValueAndTimestamp.make("KLMN", 63L));
+        expected.put(63L, ValueAndTimestamp.make("LMN", 63L));
+        expected.put(66L, ValueAndTimestamp.make("O", 76L));
+        expected.put(67L, ValueAndTimestamp.make("OP", 77L));
+        expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
+        expected.put(77L, ValueAndTimestamp.make("PQ", 80L));
+        expected.put(78L, ValueAndTimestamp.make("Q", 80L));
+
+        assertEquals(expected, actual);
+
+    }
+
+    private void assertLatenessMetrics(final TopologyTestDriver driver,
+                                       final Matcher<Object> dropTotal,
+                                       final Matcher<Object> maxLateness,
+                                       final Matcher<Object> avgLateness) {
+
+        final MetricName dropTotalMetric;
+        final MetricName dropRateMetric;
+        final MetricName latenessMaxMetric;
+        final MetricName latenessAvgMetric;
+        dropTotalMetric = new MetricName(
+                "dropped-records-total",
+                "stream-task-metrics",
+                "The total number of dropped records",
+                mkMap(
+                        mkEntry("thread-id", threadId),
+                        mkEntry("task-id", "0_0")
+                )
+        );
+        dropRateMetric = new MetricName(
+                "dropped-records-rate",
+                "stream-task-metrics",
+                "The average number of dropped records per second",
+                mkMap(
+                        mkEntry("thread-id", threadId),
+                        mkEntry("task-id", "0_0")
+                )
+        );
+        latenessMaxMetric = new MetricName(
+                "record-lateness-max",
+                "stream-task-metrics",
+                "The observed maximum lateness of records in milliseconds, measured by comparing the record "
+                        + "timestamp with the current stream time",
+                mkMap(
+                        mkEntry("thread-id", threadId),
+                        mkEntry("task-id", "0_0")
+                )
+        );
+        latenessAvgMetric = new MetricName(
+                "record-lateness-avg",
+                "stream-task-metrics",
+                "The observed average lateness of records in milliseconds, measured by comparing the record "
+                        + "timestamp with the current stream time",
+                mkMap(
+                        mkEntry("thread-id", threadId),
+                        mkEntry("task-id", "0_0")
+                )
+        );
+        assertThat(driver.metrics().get(dropTotalMetric).metricValue(), dropTotal);
+        assertThat(driver.metrics().get(dropRateMetric).metricValue(), not(0.0));
+        assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
+        assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
new file mode 100644
index 0000000..41ac960
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import static java.time.Duration.ofMillis;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.Properties;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SlidingWindowedCogroupedKStreamImplTest {
+
+    private static final String TOPIC = "topic";
+    private static final String TOPIC2 = "topic2";
+    private static final String OUTPUT = "output";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    private KGroupedStream<String, String> groupedStream;
+
+    private KGroupedStream<String, String> groupedStream2;
+    private CogroupedKStream<String, String> cogroupedStream;
+    private TimeWindowedCogroupedKStream<String, String> windowedCogroupedStream;
+
+    private final Properties props = StreamsTestUtils
+            .getStreamsConfig(Serdes.String(), Serdes.String());
+
+    @Before
+    public void setup() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed
+                .with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream(TOPIC2, Consumed
+                .with(Serdes.String(), Serdes.String()));
+
+        groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+                .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
+        windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)));
+    }
+
+    @Test
+    public void shouldNotHaveNullInitializerOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(null));
+    }
+
+    @Test
+    public void shouldNotHaveNullMaterializedOnTwoOptionAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Materialized<String, String, WindowStore<Bytes, byte[]>>) null));
+    }
+
+    @Test
+    public void shouldNotHaveNullNamedTwoOptionOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Named) null));
+    }
+
+    @Test
+    public void shouldNotHaveNullInitializerTwoOptionNamedOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(null, Named.as("test")));
+    }
+
+    @Test
+    public void shouldNotHaveNullInitializerTwoOptionMaterializedOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(null, Materialized.as("test")));
+    }
+
+    @Test
+    public void shouldNotHaveNullInitializerThreeOptionOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(null, Named.as("test"), Materialized.as("test")));
+    }
+
+    @Test
+    public void shouldNotHaveNullMaterializedOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, Named.as("Test"), null));
+    }
+
+    @Test
+    public void shouldNotHaveNullNamedOnAggregate() {
+        assertThrows(NullPointerException.class, () ->  windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("test")));
+    }
+
+    @Test
+    public void namedParamShouldSetName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed
+                .with(Serdes.String(), Serdes.String()));
+        groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
+
+        assertThat(builder.build().describe().toString(), equalTo(
+                "Topologies:\n" +
+                        "   Sub-topology: 0\n" +
+                        "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+                        "      --> foo-cogroup-agg-0\n" +
+                        "    Processor: foo-cogroup-agg-0 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                        "      --> foo-cogroup-merge\n" +
+                        "      <-- KSTREAM-SOURCE-0000000000\n" +
+                        "    Processor: foo-cogroup-merge (stores: [])\n" +
+                        "      --> none\n" +
+                        "      <-- foo-cogroup-agg-0\n\n"));
+    }
+
+    @Test
+    public void slidingWindowAggregateTestStreamsTest() {
+        final KTable<Windowed<String>, String> customers = windowedCogroupedStream.aggregate(
+                MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
+                    TOPIC, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
+                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "A", 500);
+            testInputTopic.pipeInput("k2", "A", 500);
+            testInputTopic.pipeInput("k2", "A", 501);
+            testInputTopic.pipeInput("k1", "A", 502);
+            testInputTopic.pipeInput("k1", "B", 503);
+            testInputTopic.pipeInput("k2", "B", 503);
+            testInputTopic.pipeInput("k2", "B", 504);
+            testInputTopic.pipeInput("k1", "B", 504);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 501);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A", 501);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 502);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A", 502);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B", 503);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 504);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B+B", 504);
+        }
+    }
+
+    @Test
+    public void slidingWindowAggregateOverlappingWindowsTest() {
+
+        final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate(
+                        MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
+                    TOPIC, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
+                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "A", 500);
+            testInputTopic.pipeInput("k2", "A", 500);
+            testInputTopic.pipeInput("k1", "B", 750);
+            testInputTopic.pipeInput("k2", "B", 750);
+            testInputTopic.pipeInput("k2", "A", 1000L);
+            testInputTopic.pipeInput("k1", "A", 1000L);
+
+            // left window k1@500
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500);
+            // left window k2@500
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500);
+            // right window k1@500
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 750);
+            // left window k1@750
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 750);
+            // right window k2@500
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 750);
+            // left window k2@750
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 750);
+            // right window k2@500 update
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+A", 1000);
+            // right window k2@750
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 1000);
+            // left window k2@1000
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+A", 1000);
+            // right window k1@500 update
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+A", 1000);
+            // right window k1@750
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 1000);
+            // left window k1@1000
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+A", 1000);
+        }
+    }
+
+    private void assertOutputKeyValueTimestamp(final TestOutputTopic<Windowed<String>, String> outputTopic,
+                                               final String expectedKey,
+                                               final String expectedValue,
+                                               final long expectedTimestamp) {
+        final TestRecord<Windowed<String>, String> realRecord = outputTopic.readRecord();
+        final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
+                realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
+        final TestRecord<String, String> testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
+        assertThat(nonWindowedRecord, equalTo(testRecord));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
new file mode 100644
index 0000000..d6b26bf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.
+            groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
+    }
+
+    @Test
+    public void shouldCountSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        windowedStream
+            .count()
+            .toStream()
+            .process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+            equalTo(ValueAndTimestamp.make(1L, 100L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+            equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make(2L, 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+            equalTo(ValueAndTimestamp.make(1L, 500L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+            equalTo(ValueAndTimestamp.make(2L, 200L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+            equalTo(ValueAndTimestamp.make(1L, 200L)));
+    }
+
+    @Test
+    public void shouldReduceSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        windowedStream
+            .reduce(MockReducer.STRING_ADDER)
+            .toStream()
+            .process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+            equalTo(ValueAndTimestamp.make("1", 100L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+            equalTo(ValueAndTimestamp.make("2", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make("1+2", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+            equalTo(ValueAndTimestamp.make("3", 500L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+            equalTo(ValueAndTimestamp.make("10+20", 200L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make("20", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+            equalTo(ValueAndTimestamp.make("10", 200L)));
+    }
+
+    @Test
+    public void shouldAggregateSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        windowedStream
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.with(Serdes.String(), Serdes.String()))
+            .toStream()
+            .process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+            equalTo(ValueAndTimestamp.make("0+1", 100L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+            equalTo(ValueAndTimestamp.make("0+2", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+            equalTo(ValueAndTimestamp.make("0+3", 500L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+            equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+            equalTo(ValueAndTimestamp.make("0+20", 150L)));
+        assertThat(
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
+                .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+            equalTo(ValueAndTimestamp.make("0+10", 200L)));
+    }
+
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(
+            Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, Long>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), 1L),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), 2L),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), 1L),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), 1L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), 1L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), 2L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), 1L))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+                    driver.getTimestampedWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make(1L, 100L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make(2L, 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make(1L, 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make(1L, 500L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make(1L, 150L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make(2L, 200L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make(1L, 200L)))));            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(
+            MockReducer.STRING_ADDER,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, String>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "1"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), "2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), "3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), "20"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), "10+20"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), "10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+                    driver.getTimestampedWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("1", 100L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("1+2", 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make("2", 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make("3", 500L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make("20", 150L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make("10+20", 200L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make("10", 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, String>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "0+1"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "0+1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), "0+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), "0+3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), "0+20"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), "0+10+20"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), "0+10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+                    driver.getTimestampedWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("0+1", 100L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("0+1+2", 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 201)), ValueAndTimestamp.make("0+2", 150L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 500)), ValueAndTimestamp.make("0+3", 500L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 150)), ValueAndTimestamp.make("0+20", 150L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 200)), ValueAndTimestamp.make("0+10+20", 200L)),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 251)), ValueAndTimestamp.make("0+10", 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.aggregate(MockInitializer.STRING_INIT, null));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.reduce(null));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            null,
+            Materialized.as("store")));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.reduce(null, Materialized.as("store")));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.reduce(MockReducer.STRING_ADDER, (Named) null));
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        assertThrows(NullPointerException.class, () -> windowedStream.count((Materialized<String, Long, WindowStore<Bytes, byte[]>>) null));
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumentWhenRetentionIsTooSmall() {
+        assertThrows(IllegalArgumentException.class, () -> windowedStream
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized
+                    .<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+                    .withRetention(ofMillis(1L))
+            )
+        );
+    }
+
+    @Test
+    public void shouldDropWindowsOutsideOfRetention() {
+        final WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("aggregated", ofMillis(1200L), ofMillis(100L), false);
+        windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            Materialized.<String, String>as(storeSupplier)
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+                .withCachingDisabled());
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+
+            inputTopic.pipeInput("1", "2", 100L);
+            inputTopic.pipeInput("1", "3", 500L);
+            inputTopic.pipeInput("1", "4", 799L);
+            inputTopic.pipeInput("1", "4", 1000L);
+            inputTopic.pipeInput("1", "5", 2000L);
+
+            {
+                final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, String>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(10000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), "0+4"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), "0+5"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+                    driver.getTimestampedWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+                    StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(2000L)));
+                assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), ValueAndTimestamp.make("0+4", 1000L)),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), ValueAndTimestamp.make("0+5", 2000L)))));
+            }
+        }
+    }
+
+    private void processData(final TopologyTestDriver driver) {
+        final TestInputTopic<String, String> inputTopic =
+            driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
+        inputTopic.pipeInput("1", "1", 100L);
+        inputTopic.pipeInput("1", "2", 150L);
+        inputTopic.pipeInput("1", "3", 500L);
+        inputTopic.pipeInput("2", "10", 200L);
+        inputTopic.pipeInput("2", "20", 150L);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 29eb539..a65c481 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -460,6 +461,89 @@ public class SuppressScenarioTest {
     }
 
     @Test
+    public void shouldSupportFinalResultsForSlidingWindows() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Windowed<String>, Long> valueCounts = builder
+                .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
+                .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5L), ofMillis(15L)))
+                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
+        valueCounts
+                .suppress(untilWindowCloses(unbounded()))
+                .toStream()
+                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
+                .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
+        valueCounts
+                .toStream()
+                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
+                .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
+            inputTopic.pipeInput("k1", "v1", 10L);
+            inputTopic.pipeInput("k1", "v1", 11L);
+            inputTopic.pipeInput("k1", "v1", 10L);
+            inputTopic.pipeInput("k1", "v1", 13L);
+            inputTopic.pipeInput("k1", "v1", 10L);
+            inputTopic.pipeInput("k1", "v1", 24L);
+            // this update should get dropped, since the previous event advanced the stream time and closed the window.
+            inputTopic.pipeInput("k1", "v1", 5L);
+            inputTopic.pipeInput("k1", "v1", 7L);
+            // final record to advance stream time and flush windows
+            inputTopic.pipeInput("k1", "v1", 90L);
+            verify(
+                    drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                    asList(
+                            // left window for k1@10 created when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
+                            // right window for k1@10 created when k1@11 is processed
+                            new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L),
+                            // left window for k1@11 created when k1@11 is processed
+                            new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
+                            // left window for k1@10 updated when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
+                            // left window for k1@11 updated when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
+                            // right window for k1@10 updated when k1@13 is processed
+                            new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+                            // right window for k1@11 created when k1@13 is processed
+                            new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+                            // left window for k1@13 created when k1@13 is processed
+                            new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
+                            // left window for k1@10 updated when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
+                            // left window for k1@11 updated when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
+                            // left window for k1@13 updated when k1@10 is processed
+                            new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+                            // left window for k1@24 created when k1@24 is processed
+                            new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L),
+                            // left window for k1@10 updated when k1@5 is processed
+                            new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L),
+                            // left window for k1@10 updated when k1@7 is processed
+                            new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+                            // left window for k1@11 updated when k1@7 is processed
+                            new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+                            new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L)
+                            )
+            );
+            verify(
+                    drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                    asList(
+                            new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+                            new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+                            new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+                            new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+                            new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+                            new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L)
+                            )
+            );
+        }
+    }
+
+    @Test
     public void shouldSupportFinalResultsForSessionWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder