You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:37:12 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r462626773



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.
+ *
+ * While the window is sliding over the input data stream, a new window is created each time a record enters
+ * the sliding window or a record drops out of the sliding window.
+ *
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * - window {@code [3000;8000]} contains [1] (created when first record enters the window)
+ * - window {@code [4200;9200]} contains [1,2] (created when second record enters the window)
+ * - window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)
+ * - window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)
+ * - window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)
+ *
+ * Note that while SlidingWindows are of a fixed size {@link TimeWindows}, the start and end points
+ * depend on when events are processed, similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+
+    /** The size of the windows in milliseconds, defined by the max time difference between records. */
+    public final long timeDifference;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifference, final long graceMs) {
+        this.timeDifference = timeDifference;
+        this.graceMs = graceMs;
+
+    }
+
+    /**
+     * Return a window definition with the window size based on the given maximum time difference (inclusive) between
+     * records in the same window and given window grace period.
+     *
+     * Reject out-of-order events that arrive after {@code grace}. A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size or grace is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
+        final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        if (timeDifferenceMs <= 0) {
+            throw new IllegalArgumentException("Window timeDifference (timeDifference) must be larger than zero.");
+        }
+        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "afterWindowEnd");
+        final long graceMs = ApiUtils.validateMillisecondDuration(grace, msgPrefixGrace);
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
+        return new SlidingWindows(timeDifferenceMs, graceMs);
+    }
+
+
+    public long timeDifference() {

Review comment:
       nit: also rename the method `timeDifferenceMs` to be consistent with `gracePeriodMs`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
##########
@@ -275,6 +275,15 @@
      */
     <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);
 
+    /**
+     * Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session

Review comment:
       ```suggestion
        * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }

Review comment:
       I think it's ok to skip this; since it's a new operator, there's no old topology to be compatible with

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.
+ *
+ * While the window is sliding over the input data stream, a new window is created each time a record enters
+ * the sliding window or a record drops out of the sliding window.
+ *
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * - window {@code [3000;8000]} contains [1] (created when first record enters the window)
+ * - window {@code [4200;9200]} contains [1,2] (created when second record enters the window)
+ * - window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)
+ * - window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)
+ * - window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)
+ *
+ * Note that while SlidingWindows are of a fixed size {@link TimeWindows}, the start and end points
+ * depend on when events are processed, similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+
+    /** The size of the windows in milliseconds, defined by the max time difference between records. */
+    public final long timeDifference;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifference, final long graceMs) {
+        this.timeDifference = timeDifference;
+        this.graceMs = graceMs;
+
+    }
+
+    /**
+     * Return a window definition with the window size based on the given maximum time difference (inclusive) between
+     * records in the same window and given window grace period.
+     *
+     * Reject out-of-order events that arrive after {@code grace}. A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size or grace is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
+        final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        if (timeDifferenceMs <= 0) {
+            throw new IllegalArgumentException("Window timeDifference (timeDifference) must be larger than zero.");
+        }
+        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "afterWindowEnd");
+        final long graceMs = ApiUtils.validateMillisecondDuration(grace, msgPrefixGrace);
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
+        return new SlidingWindows(timeDifferenceMs, graceMs);
+    }
+
+
+    public long timeDifference() {
+        return timeDifference;
+    }
+
+
+
+
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode

Review comment:
       I think we can remove this suppression (and all the ones below)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.
+ *
+ * While the window is sliding over the input data stream, a new window is created each time a record enters
+ * the sliding window or a record drops out of the sliding window.
+ *
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * - window {@code [3000;8000]} contains [1] (created when first record enters the window)
+ * - window {@code [4200;9200]} contains [1,2] (created when second record enters the window)
+ * - window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)
+ * - window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)
+ * - window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)
+ *
+ * Note that while SlidingWindows are of a fixed size {@link TimeWindows}, the start and end points
+ * depend on when events are processed, similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+
+    /** The size of the windows in milliseconds, defined by the max time difference between records. */
+    public final long timeDifference;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifference, final long graceMs) {
+        this.timeDifference = timeDifference;
+        this.graceMs = graceMs;
+
+    }
+
+    /**
+     * Return a window definition with the window size based on the given maximum time difference (inclusive) between
+     * records in the same window and given window grace period.
+     *
+     * Reject out-of-order events that arrive after {@code grace}. A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size or grace is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
+        final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        if (timeDifferenceMs <= 0) {
+            throw new IllegalArgumentException("Window timeDifference (timeDifference) must be larger than zero.");
+        }
+        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "afterWindowEnd");
+        final long graceMs = ApiUtils.validateMillisecondDuration(grace, msgPrefixGrace);
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
+        return new SlidingWindows(timeDifferenceMs, graceMs);
+    }
+
+
+    public long timeDifference() {
+        return timeDifference;
+    }
+
+
+
+
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    public long gracePeriodMs() {
+        return graceMs;
+    }
+
+
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final SlidingWindows that = (SlidingWindows) o;
+        return  timeDifference == that.timeDifference &&

Review comment:
       nit: extra space after `return` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(named, materialized);
+    }
+
+    private KTable<Windowed<K>, Long> doCount(final Named named,
+                                              final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(Serdes.Long());
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named) {
+        return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        return reduce(reducer, NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
+        return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        return reduce(reducer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Named named,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(valueSerde);
+        }
+
+        final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            // new style retention: use Materialized retention and default segmentInterval

Review comment:
       let's just remove this comment since it's the only style retention here (also in `SlidingWindowedCogroupedKStreamImpl`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    //left window just contains the current record
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create the right window for
+            if (!rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifference);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+        }
+
+        public void processInOrder(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinAlreadyCreated = false;
+
+
+            //to keep find the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    //potentially need to change long to instant
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    final long endTime = next.key.window().end();
+                    final long startTime = next.key.window().start();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        if (isLeftWindow(next)) {
+                            latestLeftTypeWindow = next.key.window();
+                        }
+                        continue;
+                    } else if (endTime == timestamp) {
+                        leftWinExists = true;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        foundLeftFirst = isLeftWindow(next) ? true : false;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else {
+                        rightWindowExists = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                if (windowStartTimes.contains(rightWinStart)) {
+                    prevRightWinAlreadyCreated = true;
+                } else {
+                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                if (latestLeftTypeWindow != null) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create right window for new record
+            if (!rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifference);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+        }
+
+        private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
+            return window.key.window().end() == window.value.timestamp();
+        }
+
+        private void putAndForward(final Window window, final ValueAndTimestamp<Agg> valueAndTime, final K key,
+                                   final V value, final long closeTime, final long timestamp) {
+            final long windowStart = window.start();
+            final long windowEnd = window.end();
+            if (windowEnd > closeTime) {
+                //get aggregate from existing window
+                final Agg oldAgg = getValueOrNull(valueAndTime);
+                //add record's value to existing aggregate
+                final Agg newAgg = aggregator.apply(key, value, oldAgg);
+
+                windowStore.put(key,
+                        ValueAndTimestamp.make(newAgg, Math.max(timestamp, valueAndTime.timestamp())),
+                        windowStart);
+                tupleForwarder.maybeForward(
+                        new Windowed<K>(key, window),
+                        newAgg,
+                        sendOldValues ? oldAgg : null,
+                        windowStart);

Review comment:
       I think we need to pass in the new maximum window timestamp here, not the window start time

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    //left window just contains the current record
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create the right window for
+            if (!rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)) {

Review comment:
       Can we put this condition into a method and give it a clear name to describe what this means? eg
   
   ```
   private boolean rightWindowHasNotBeenCreatedAndIsNonEmpty(..) {
       return !rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
##########
@@ -19,7 +19,16 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.*;

Review comment:
       Not sure if you did this or your IDE did it automatically, but nice 👍 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.
+ *
+ * While the window is sliding over the input data stream, a new window is created each time a record enters
+ * the sliding window or a record drops out of the sliding window.
+ *
+ * Records that come after set grace period will be ignored, i.e., a window is closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * - window {@code [3000;8000]} contains [1] (created when first record enters the window)
+ * - window {@code [4200;9200]} contains [1,2] (created when second record enters the window)
+ * - window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)
+ * - window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)
+ * - window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)
+ *
+ * Note that while SlidingWindows are of a fixed size {@link TimeWindows}, the start and end points
+ * depend on when events are processed, similar to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+
+    /** The size of the windows in milliseconds, defined by the max time difference between records. */
+    public final long timeDifference;

Review comment:
       nit: call this `timeDifferenceMs` to be in sync with `graceMs`. Also it can be private

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }

Review comment:
       I wonder why we have to do this for `count` but not for `aggregate` and `reduce`? Is this intentional or an oversight? cc @mjsax @vvcephei @guozhangwang 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class SlidingWindowedCogroupedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedCogroupedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+    private final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns;
+
+    SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+                                     final InternalStreamsBuilder builder,
+                                     final Set<String> subTopologySourceNodes,
+                                     final String name,
+                                     final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                                     final StreamsGraphNode streamsGraphNode,
+                                     final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, V>> groupPatterns) {
+        super(name, null, null, subTopologySourceNodes, streamsGraphNode, builder);
+        //keySerde and valueSerde are null because there are many different groupStreams that they could be from
+        this.windows = windows;
+        this.aggregateBuilder = aggregateBuilder;
+        this.groupPatterns = groupPatterns;
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer) {
+        return aggregate(initializer, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named) {
+        return aggregate(initializer, named, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named,
+                                            final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(
+                materialized,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+        return aggregateBuilder.build(
+                groupPatterns,
+                initializer,
+                new NamedInternal(named),
+                materialize(materializedInternal),
+                materializedInternal.keySerde() != null ?
+                        new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference())
+                        : null,
+                materializedInternal.valueSerde(),
+                materializedInternal.queryableStoreName(),
+                null,
+                windows,
+                null,
+                null);
+    }
+
+    @SuppressWarnings("deprecation")
+    // continuing to support Windows#maintainMs/segmentInterval in fallback mode

Review comment:
       We can remove this 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                     final boolean stateCreated,
                                                                                     final StoreBuilder<?> storeBuilder,
                                                                                     final Windows<W> windows,
+                                                                                    final SlidingWindows slidingWindows,
                                                                                     final SessionWindows sessionWindows,
                                                                                     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows == null) {
+            kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger != null) {

Review comment:
       Seems like this should have also had a check for `sessionWindows != null`, right? Can we add that as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {

Review comment:
       I was wondering what this method is actually used for so I checked out the callers of `KStreamWindowAggregate#windows`. There's a method called `extractGracePeriod` in `GraphGraceSearchUtil` where we might actually need to make a small addition to include the new sliding window processor.
   
   I think it's for Suppression, which needs to figure out the grace period of the upstream operator since grace period doesn't get passed in directly to `suppress`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;

Review comment:
       Can we also name this variable a bit more clearly instead of the comment? Like `foundClosestStartTimeWindow` or something. Same with `foundFirstEndTime`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,

Review comment:
       nit: alignment is off by one on the parameters

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(named, materialized);
+    }
+
+    private KTable<Windowed<K>, Long> doCount(final Named named,
+                                              final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(Serdes.Long());
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named) {
+        return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        return reduce(reducer, NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
+        return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        return reduce(reducer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Named named,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(valueSerde);
+        }
+
+        final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode

Review comment:
       You should be able to remove this suppression and comment (here and in `SlidingWindowedCogroupedKStreamImpl`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(named, materialized);
+    }
+
+    private KTable<Windowed<K>, Long> doCount(final Named named,
+                                              final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(Serdes.Long());
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named) {
+        return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        return reduce(reducer, NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
+        return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        return reduce(reducer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Named named,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(valueSerde);
+        }
+
+        final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            // new style retention: use Materialized retention and default segmentInterval
+            final long retentionPeriod = materialized.retention().toMillis();
+
+            if ((windows.timeDifference() + windows.gracePeriodMs()) > retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of the window store "
+                        + name + " must be no smaller than its window time difference plus the grace period."
+                        + " Got time difference=[" + windows.timeDifference() + "],"
+                        + " grace=[" + windows.gracePeriodMs() + "],"
+                        + " retention=[" + retentionPeriod + "]");
+            }
+
+            supplier = Stores.persistentTimestampedWindowStore(
+                    materialized.storeName(),
+                    Duration.ofMillis(retentionPeriod),
+                    Duration.ofMillis(windows.timeDifference()),
+                    false
+            );
+        }
+        final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+        );
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }

Review comment:
       Can we add an `else` here with `builder.withCachingDisabled()`? It doesn't make a difference logically, it just seems easier to understand (again, also in `SlidingWindowedCogroupedKStreamImpl`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    //left window just contains the current record
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create the right window for
+            if (!rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifference);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+        }
+
+        public void processInOrder(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinAlreadyCreated = false;
+
+
+            //to keep find the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    //potentially need to change long to instant
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    final long endTime = next.key.window().end();
+                    final long startTime = next.key.window().start();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        if (isLeftWindow(next)) {
+                            latestLeftTypeWindow = next.key.window();
+                        }
+                        continue;
+                    } else if (endTime == timestamp) {
+                        leftWinExists = true;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        foundLeftFirst = isLeftWindow(next) ? true : false;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else {
+                        rightWindowExists = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                if (windowStartTimes.contains(rightWinStart)) {
+                    prevRightWinAlreadyCreated = true;
+                } else {
+                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                if (latestLeftTypeWindow != null) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+            //create right window for new record
+            if (!rightWindowExists && (foundLeftFirst || prevRightWinAlreadyCreated)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifference);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+        }
+
+        private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
+            return window.key.window().end() == window.value.timestamp();
+        }
+
+        private void putAndForward(final Window window, final ValueAndTimestamp<Agg> valueAndTime, final K key,

Review comment:
       nit: put each parameter on its own line

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;

Review comment:
       Does that make sense? In particular I feel like we're using `Exists` to mean one thing for `left/RightWindowExists`, and then we mean another thing entirely in `prevRightWindowExists`. ie `prevRightWinAlreadyCreated` is more similar to what we mean by the `left/RightWindowExists` variables

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;

Review comment:
       In general it's better to use a more descriptive variable name than a shorted one with a comment. It's not always possible to describe a variable exactly in a reasonable length, but I think in this case we can say `curLeftWindowAlreadyExists` or `curLeftWindowAlreadyCreated` or something
   
   Might be better to use `AlreadyCreated` when we're specifically talking about whether or not a window already exists in the window store, and can use `Exists` when we're talking about whether a window is possible regardless of whether it currently has been created or not

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference

Review comment:
       this comment doesn't seem quite correct

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;

Review comment:
       maybe add a comment saying that this condition will only be hit on the very first record. Or it might be reasonable to pull this one condition out of the loop and just handle it before entering the loop

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));

Review comment:
       Seems like we're aggregating with the new value twice; we call `aggregator.apply` once in this if/else branch but then also call it again in `putAndForward`, right? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;

Review comment:
       Should this be inside the `if (!foundFirst)` condition above? We only want to save the aggregate of the first window we find with a start time less than the timestamp right?
   
   Also, I think we might need to check that the max timestamp of this window is greater than the current record's timestamp. If not, then the right window will be empty.
   For example, we have a record A at 10 and a record B at 11 and then process a record at 15. Obviously, the new right window will be empty. But the first window we'll find with a start time less than 15 will be [11, 21] with agg B.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;

Review comment:
       Actually maybe `foundRight/LeftWindowAggregate` would be good, since that's what "the window with the closest start/end time to the record" actually means to us

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;

Review comment:
       Can we remove this and just `break` out of the loop immediately at the end of the `isLeftWindow` condition block? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {
+                    aggValue = aggregator.apply(key, value, getValueOrNull(leftWinAgg));
+                    newTimestamp = leftWinAgg.timestamp();
+                } else {
+                    //left window just contains the current record
+                    aggValue = aggregator.apply(key, value, initializer.apply());
+                    newTimestamp = timestamp;
+                }
+                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifference, timestamp);
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(aggValue, Math.max(timestamp, newTimestamp));

Review comment:
       Since it's a left window, the max timestamp should always be `timestamp`, right? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    SlidingWindowedKStreamImpl(final SlidingWindows windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> subTopologySourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return count(NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named) {
+        return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+    }
+
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        return count(NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Named named, final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(named, materialized);
+    }
+
+    private KTable<Windowed<K>, Long> doCount(final Named named,
+                                              final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(Serdes.Long());
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named) {
+        return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null));
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Named named,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+
+        final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(aggregateName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        return reduce(reducer, NamedInternal.empty());
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
+        return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        return reduce(reducer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Named named,
+                                         final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
+
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
+        if (materializedInternal.valueSerde() == null) {
+            materializedInternal.withValueSerde(valueSerde);
+        }
+
+        final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+
+        return aggregateBuilder.build(
+                new NamedInternal(reduceName),
+                materialize(materializedInternal),
+                new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                materializedInternal.queryableStoreName(),
+                materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifference()) : null,
+                materializedInternal.valueSerde());
+
+
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            // new style retention: use Materialized retention and default segmentInterval
+            final long retentionPeriod = materialized.retention().toMillis();
+
+            if ((windows.timeDifference() + windows.gracePeriodMs()) > retentionPeriod) {

Review comment:
       Do you think we actually need to enforce that the retention period be a little longer for sliding windows? I was just thinking that since the range scan starts at `timestamp - 2 * windows.timeDifference()`, maybe we should actually enforce that the retention period be >= `2 * timeDifference + gracePeriod` in case we need to get the aggregate value from some older window that has technically expired. 
   Haven't checked the math so I'm not sure that's the correct value exactly, but it seems like it might need to be a little bigger. Any thoughts?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {

Review comment:
       Can we add a comment to clarify that we're checking whether it's a left window because that tells us there was a record at this window's end time

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;
+            boolean prevRightWinAlreadyCreated = false;
+
+            try (
+                    //Fetch all the windows that have a start time between timestamp and timestamp+timeDifference
+                    final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(key,
+                            key,
+                            Instant.ofEpochMilli(timestamp - 2 * windows.timeDifference()),
+                            Instant.ofEpochMilli(timestamp + 1))
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time to the record
+                boolean foundFirst = false;
+                //if we've already seen the window with the closest end time to the record
+                boolean foundFirstEndTime = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWindowExists = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundFirst) {
+                            foundFirst = true;
+                            if (isLeftWindow(next)) {
+                                foundLeftFirst = true;
+                            }
+                        }
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinExists = true;
+                        continue;
+                    } else {
+                        if (!foundFirstEndTime) {
+                            leftWinAgg = next.value;
+                            foundFirstEndTime = true;
+                        }
+                        if (prevRightWinExists) {
+                            break;
+                        }
+                        if (isLeftWindow(next)) {
+                            prevRightWinExists = true;
+                            final long rightWinStart = next.key.window().end() + 1;
+                            if (windowStartTimes.contains(rightWinStart)) {
+                                prevRightWinAlreadyCreated = true;
+                            } else {
+                                final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifference);
+                                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+                            }
+                        }
+                    }
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinExists) {
+                final Agg aggValue;
+                final long newTimestamp;
+                //confirms that the left window contains more than the current record
+                if (prevRightWinExists) {

Review comment:
       Ok I may have lost the trail of logic here...are we just checking `prevRightWinExists` as an indicator of whether we actually found any records to the left of our record within range? Could/should we check `foundFirstEndTime` instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+
+    private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+            if (key == null) {
+                log.warn(
+                        "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            // flag to help determine if out-of-order record’s right window is non-empty
+            boolean foundLeftFirst = false;
+            //if current record's left/right windows already exist
+            boolean leftWinExists = false;
+            boolean rightWindowExists = false;
+            //to determine if we're creating the previous record's right window, helps with determining empty windows
+            boolean prevRightWinExists = false;

Review comment:
       I feel like I'm just way overthinking this, but I keep getting these variables confused. Maybe we could call this guy `prevRightWindowCanExist`? Does that seem to get at its underlying purpose?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org